Files
agenciapsilmno/database-novo/schema/03_functions/realtime.sql

722 lines
24 KiB
PL/PgSQL

-- =============================================================================
-- AgenciaPsi — Functions — realtime schema
-- =============================================================================
CREATE FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer DEFAULT (1024 * 1024)) RETURNS SETOF realtime.wal_rls
LANGUAGE plpgsql
AS $$
declare
-- Regclass of the table e.g. public.notes
entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;
-- I, U, D, T: insert, update ...
action realtime.action = (
case wal ->> 'action'
when 'I' then 'INSERT'
when 'U' then 'UPDATE'
when 'D' then 'DELETE'
else 'ERROR'
end
);
-- Is row level security enabled for the table
is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;
subscriptions realtime.subscription[] = array_agg(subs)
from
realtime.subscription subs
where
subs.entity = entity_;
-- Subscription vars
roles regrole[] = array_agg(distinct us.claims_role::text)
from
unnest(subscriptions) us;
working_role regrole;
claimed_role regrole;
claims jsonb;
subscription_id uuid;
subscription_has_access bool;
visible_to_subscription_ids uuid[] = '{}';
-- structured info for wal's columns
columns realtime.wal_column[];
-- previous identity values for update/delete
old_columns realtime.wal_column[];
error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;
-- Primary jsonb output for record
output jsonb;
begin
perform set_config('role', null, true);
columns =
array_agg(
(
x->>'name',
x->>'type',
x->>'typeoid',
realtime.cast(
(x->'value') #>> '{}',
coalesce(
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
(x->>'type')::regtype
)
),
(pks ->> 'name') is not null,
true
)::realtime.wal_column
)
from
jsonb_array_elements(wal -> 'columns') x
left join jsonb_array_elements(wal -> 'pk') pks
on (x ->> 'name') = (pks ->> 'name');
old_columns =
array_agg(
(
x->>'name',
x->>'type',
x->>'typeoid',
realtime.cast(
(x->'value') #>> '{}',
coalesce(
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
(x->>'type')::regtype
)
),
(pks ->> 'name') is not null,
true
)::realtime.wal_column
)
from
jsonb_array_elements(wal -> 'identity') x
left join jsonb_array_elements(wal -> 'pk') pks
on (x ->> 'name') = (pks ->> 'name');
for working_role in select * from unnest(roles) loop
-- Update `is_selectable` for columns and old_columns
columns =
array_agg(
(
c.name,
c.type_name,
c.type_oid,
c.value,
c.is_pkey,
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
)::realtime.wal_column
)
from
unnest(columns) c;
old_columns =
array_agg(
(
c.name,
c.type_name,
c.type_oid,
c.value,
c.is_pkey,
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
)::realtime.wal_column
)
from
unnest(old_columns) c;
if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then
return next (
jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action
),
is_rls_enabled,
-- subscriptions is already filtered by entity
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
array['Error 400: Bad Request, no primary key']
)::realtime.wal_rls;
-- The claims role does not have SELECT permission to the primary key of entity
elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then
return next (
jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action
),
is_rls_enabled,
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
array['Error 401: Unauthorized']
)::realtime.wal_rls;
else
output = jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action,
'commit_timestamp', to_char(
((wal ->> 'timestamp')::timestamptz at time zone 'utc'),
'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'
),
'columns', (
select
jsonb_agg(
jsonb_build_object(
'name', pa.attname,
'type', pt.typname
)
order by pa.attnum asc
)
from
pg_attribute pa
join pg_type pt
on pa.atttypid = pt.oid
where
attrelid = entity_
and attnum > 0
and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT')
)
)
-- Add "record" key for insert and update
|| case
when action in ('INSERT', 'UPDATE') then
jsonb_build_object(
'record',
(
select
jsonb_object_agg(
-- if unchanged toast, get column name and value from old record
coalesce((c).name, (oc).name),
case
when (c).name is null then (oc).value
else (c).value
end
)
from
unnest(columns) c
full outer join unnest(old_columns) oc
on (c).name = (oc).name
where
coalesce((c).is_selectable, (oc).is_selectable)
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
)
)
else '{}'::jsonb
end
-- Add "old_record" key for update and delete
|| case
when action = 'UPDATE' then
jsonb_build_object(
'old_record',
(
select jsonb_object_agg((c).name, (c).value)
from unnest(old_columns) c
where
(c).is_selectable
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
)
)
when action = 'DELETE' then
jsonb_build_object(
'old_record',
(
select jsonb_object_agg((c).name, (c).value)
from unnest(old_columns) c
where
(c).is_selectable
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64))
and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey
)
)
else '{}'::jsonb
end;
-- Create the prepared statement
if is_rls_enabled and action <> 'DELETE' then
if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then
deallocate walrus_rls_stmt;
end if;
execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns);
end if;
visible_to_subscription_ids = '{}';
for subscription_id, claims in (
select
subs.subscription_id,
subs.claims
from
unnest(subscriptions) subs
where
subs.entity = entity_
and subs.claims_role = working_role
and (
realtime.is_visible_through_filters(columns, subs.filters)
or (
action = 'DELETE'
and realtime.is_visible_through_filters(old_columns, subs.filters)
)
)
) loop
if not is_rls_enabled or action = 'DELETE' then
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
else
-- Check if RLS allows the role to see the record
perform
-- Trim leading and trailing quotes from working_role because set_config
-- doesn't recognize the role as valid if they are included
set_config('role', trim(both '"' from working_role::text), true),
set_config('request.jwt.claims', claims::text, true);
execute 'execute walrus_rls_stmt' into subscription_has_access;
if subscription_has_access then
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
end if;
end if;
end loop;
perform set_config('role', null, true);
return next (
output,
is_rls_enabled,
visible_to_subscription_ids,
case
when error_record_exceeds_max_size then array['Error 413: Payload Too Large']
else '{}'
end
)::realtime.wal_rls;
end if;
end loop;
perform set_config('role', null, true);
end;
$$;
ALTER FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer) OWNER TO supabase_admin;
--
-- Name: broadcast_changes(text, text, text, text, text, record, record, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text DEFAULT 'ROW'::text) RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
-- Declare a variable to hold the JSONB representation of the row
row_data jsonb := '{}'::jsonb;
BEGIN
IF level = 'STATEMENT' THEN
RAISE EXCEPTION 'function can only be triggered for each row, not for each statement';
END IF;
-- Check the operation type and handle accordingly
IF operation = 'INSERT' OR operation = 'UPDATE' OR operation = 'DELETE' THEN
row_data := jsonb_build_object('old_record', OLD, 'record', NEW, 'operation', operation, 'table', table_name, 'schema', table_schema);
PERFORM realtime.send (row_data, event_name, topic_name);
ELSE
RAISE EXCEPTION 'Unexpected operation type: %', operation;
END IF;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'Failed to process the row: %', SQLERRM;
END;
$$;
ALTER FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text) OWNER TO supabase_admin;
--
-- Name: build_prepared_statement_sql(text, regclass, realtime.wal_column[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) RETURNS text
LANGUAGE sql
AS $$
/*
Builds a sql string that, if executed, creates a prepared statement to
tests retrive a row from *entity* by its primary key columns.
Example
select realtime.build_prepared_statement_sql('public.notes', '{"id"}'::text[], '{"bigint"}'::text[])
*/
select
'prepare ' || prepared_statement_name || ' as
select
exists(
select
1
from
' || entity || '
where
' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || '
)'
from
unnest(columns) pkc
where
pkc.is_pkey
group by
entity
$$;
ALTER FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) OWNER TO supabase_admin;
--
-- Name: cast(text, regtype); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime."cast"(val text, type_ regtype) RETURNS jsonb
LANGUAGE plpgsql IMMUTABLE
AS $$
declare
res jsonb;
begin
execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res;
return res;
end
$$;
ALTER FUNCTION realtime."cast"(val text, type_ regtype) OWNER TO supabase_admin;
--
-- Name: check_equality_op(realtime.equality_op, regtype, text, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) RETURNS boolean
LANGUAGE plpgsql IMMUTABLE
AS $$
/*
Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness
*/
declare
op_symbol text = (
case
when op = 'eq' then '='
when op = 'neq' then '!='
when op = 'lt' then '<'
when op = 'lte' then '<='
when op = 'gt' then '>'
when op = 'gte' then '>='
when op = 'in' then '= any'
else 'UNKNOWN OP'
end
);
res boolean;
begin
execute format(
'select %L::'|| type_::text || ' ' || op_symbol
|| ' ( %L::'
|| (
case
when op = 'in' then type_::text || '[]'
else type_::text end
)
|| ')', val_1, val_2) into res;
return res;
end;
$$;
ALTER FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) OWNER TO supabase_admin;
--
-- Name: is_visible_through_filters(realtime.wal_column[], realtime.user_defined_filter[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) RETURNS boolean
LANGUAGE sql IMMUTABLE
AS $_$
/*
Should the record be visible (true) or filtered out (false) after *filters* are applied
*/
select
-- Default to allowed when no filters present
$2 is null -- no filters. this should not happen because subscriptions has a default
or array_length($2, 1) is null -- array length of an empty array is null
or bool_and(
coalesce(
realtime.check_equality_op(
op:=f.op,
type_:=coalesce(
col.type_oid::regtype, -- null when wal2json version <= 2.4
col.type_name::regtype
),
-- cast jsonb to text
val_1:=col.value #>> '{}',
val_2:=f.value
),
false -- if null, filter does not match
)
)
from
unnest(filters) f
join unnest(columns) col
on f.column_name = col.name;
$_$;
ALTER FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) OWNER TO supabase_admin;
--
-- Name: list_changes(name, name, integer, integer); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) RETURNS SETOF realtime.wal_rls
LANGUAGE sql
SET log_min_messages TO 'fatal'
AS $$
with pub as (
select
concat_ws(
',',
case when bool_or(pubinsert) then 'insert' else null end,
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
''
) w2j_add_tables
from
pg_publication pp
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = publication
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
slot_name, null, max_changes,
'include-pk', 'true',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := max_record_bytes
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null
$$;
ALTER FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) OWNER TO supabase_admin;
--
-- Name: quote_wal2json(regclass); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.quote_wal2json(entity regclass) RETURNS text
LANGUAGE sql IMMUTABLE STRICT
AS $$
select
(
select string_agg('' || ch,'')
from unnest(string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx)
where
not (x.idx = 1 and x.ch = '"')
and not (
x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)
and x.ch = '"'
)
)
|| '.'
|| (
select string_agg('' || ch,'')
from unnest(string_to_array(pc.relname::text, null)) with ordinality x(ch, idx)
where
not (x.idx = 1 and x.ch = '"')
and not (
x.idx = array_length(string_to_array(nsp.nspname::text, null), 1)
and x.ch = '"'
)
)
from
pg_class pc
join pg_namespace nsp
on pc.relnamespace = nsp.oid
where
pc.oid = entity
$$;
ALTER FUNCTION realtime.quote_wal2json(entity regclass) OWNER TO supabase_admin;
--
-- Name: send(jsonb, text, text, boolean); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true) RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
generated_id uuid;
final_payload jsonb;
BEGIN
BEGIN
-- Generate a new UUID for the id
generated_id := gen_random_uuid();
-- Check if payload has an 'id' key, if not, add the generated UUID
IF payload ? 'id' THEN
final_payload := payload;
ELSE
final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));
END IF;
-- Set the topic configuration
EXECUTE format('SET LOCAL realtime.topic TO %L', topic);
-- Attempt to insert the message
INSERT INTO realtime.messages (id, payload, event, topic, private, extension)
VALUES (generated_id, final_payload, event, topic, private, 'broadcast');
EXCEPTION
WHEN OTHERS THEN
-- Capture and notify the error
RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;
END;
END;
$$;
ALTER FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean) OWNER TO supabase_admin;
--
-- Name: subscription_check_filters(); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.subscription_check_filters() RETURNS trigger
LANGUAGE plpgsql
AS $$
/*
Validates that the user defined filters for a subscription:
- refer to valid columns that the claimed role may access
- values are coercable to the correct column type
*/
declare
col_names text[] = coalesce(
array_agg(c.column_name order by c.ordinal_position),
'{}'::text[]
)
from
information_schema.columns c
where
format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity
and pg_catalog.has_column_privilege(
(new.claims ->> 'role'),
format('%I.%I', c.table_schema, c.table_name)::regclass,
c.column_name,
'SELECT'
);
filter realtime.user_defined_filter;
col_type regtype;
in_val jsonb;
begin
for filter in select * from unnest(new.filters) loop
-- Filtered column is valid
if not filter.column_name = any(col_names) then
raise exception 'invalid column for filter %', filter.column_name;
end if;
-- Type is sanitized and safe for string interpolation
col_type = (
select atttypid::regtype
from pg_catalog.pg_attribute
where attrelid = new.entity
and attname = filter.column_name
);
if col_type is null then
raise exception 'failed to lookup type for column %', filter.column_name;
end if;
-- Set maximum number of entries for in filter
if filter.op = 'in'::realtime.equality_op then
in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);
if coalesce(jsonb_array_length(in_val), 0) > 100 then
raise exception 'too many values for `in` filter. Maximum 100';
end if;
else
-- raises an exception if value is not coercable to type
perform realtime.cast(filter.value, col_type);
end if;
end loop;
-- Apply consistent order to filters so the unique constraint on
-- (subscription_id, entity, filters) can't be tricked by a different filter order
new.filters = coalesce(
array_agg(f order by f.column_name, f.op, f.value),
'{}'
) from unnest(new.filters) f;
return new;
end;
$$;
ALTER FUNCTION realtime.subscription_check_filters() OWNER TO supabase_admin;
--
-- Name: to_regrole(text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin
--
CREATE FUNCTION realtime.to_regrole(role_name text) RETURNS regrole
LANGUAGE sql IMMUTABLE
AS $$ select role_name::regrole $$;
ALTER FUNCTION realtime.to_regrole(role_name text) OWNER TO supabase_admin;
--
-- Name: topic(); Type: FUNCTION; Schema: realtime; Owner: supabase_realtime_admin
--
CREATE FUNCTION realtime.topic() RETURNS text
LANGUAGE sql STABLE
AS $$
select nullif(current_setting('realtime.topic', true), '')::text;
$$;
ALTER FUNCTION realtime.topic() OWNER TO supabase_realtime_admin;
--
-- Name: can_insert_object(text, text, uuid, jsonb); Type: FUNCTION; Schema: storage; Owner: supabase_storage_admin
--