-- ============================================================================= -- AgenciaPsi — Functions — realtime schema -- ============================================================================= CREATE FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer DEFAULT (1024 * 1024)) RETURNS SETOF realtime.wal_rls LANGUAGE plpgsql AS $$ declare -- Regclass of the table e.g. public.notes entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; -- I, U, D, T: insert, update ... action realtime.action = ( case wal ->> 'action' when 'I' then 'INSERT' when 'U' then 'UPDATE' when 'D' then 'DELETE' else 'ERROR' end ); -- Is row level security enabled for the table is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; subscriptions realtime.subscription[] = array_agg(subs) from realtime.subscription subs where subs.entity = entity_; -- Subscription vars roles regrole[] = array_agg(distinct us.claims_role::text) from unnest(subscriptions) us; working_role regrole; claimed_role regrole; claims jsonb; subscription_id uuid; subscription_has_access bool; visible_to_subscription_ids uuid[] = '{}'; -- structured info for wal's columns columns realtime.wal_column[]; -- previous identity values for update/delete old_columns realtime.wal_column[]; error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; -- Primary jsonb output for record output jsonb; begin perform set_config('role', null, true); columns = array_agg( ( x->>'name', x->>'type', x->>'typeoid', realtime.cast( (x->'value') #>> '{}', coalesce( (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 (x->>'type')::regtype ) ), (pks ->> 'name') is not null, true )::realtime.wal_column ) from jsonb_array_elements(wal -> 'columns') x left join jsonb_array_elements(wal -> 'pk') pks on (x ->> 'name') = (pks ->> 'name'); old_columns = array_agg( ( x->>'name', x->>'type', x->>'typeoid', realtime.cast( (x->'value') #>> '{}', coalesce( (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 (x->>'type')::regtype ) ), (pks ->> 'name') is not null, true )::realtime.wal_column ) from jsonb_array_elements(wal -> 'identity') x left join jsonb_array_elements(wal -> 'pk') pks on (x ->> 'name') = (pks ->> 'name'); for working_role in select * from unnest(roles) loop -- Update `is_selectable` for columns and old_columns columns = array_agg( ( c.name, c.type_name, c.type_oid, c.value, c.is_pkey, pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') )::realtime.wal_column ) from unnest(columns) c; old_columns = array_agg( ( c.name, c.type_name, c.type_oid, c.value, c.is_pkey, pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') )::realtime.wal_column ) from unnest(old_columns) c; if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then return next ( jsonb_build_object( 'schema', wal ->> 'schema', 'table', wal ->> 'table', 'type', action ), is_rls_enabled, -- subscriptions is already filtered by entity (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), array['Error 400: Bad Request, no primary key'] )::realtime.wal_rls; -- The claims role does not have SELECT permission to the primary key of entity elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then return next ( jsonb_build_object( 'schema', wal ->> 'schema', 'table', wal ->> 'table', 'type', action ), is_rls_enabled, (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), array['Error 401: Unauthorized'] )::realtime.wal_rls; else output = jsonb_build_object( 'schema', wal ->> 'schema', 'table', wal ->> 'table', 'type', action, 'commit_timestamp', to_char( ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' ), 'columns', ( select jsonb_agg( jsonb_build_object( 'name', pa.attname, 'type', pt.typname ) order by pa.attnum asc ) from pg_attribute pa join pg_type pt on pa.atttypid = pt.oid where attrelid = entity_ and attnum > 0 and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') ) ) -- Add "record" key for insert and update || case when action in ('INSERT', 'UPDATE') then jsonb_build_object( 'record', ( select jsonb_object_agg( -- if unchanged toast, get column name and value from old record coalesce((c).name, (oc).name), case when (c).name is null then (oc).value else (c).value end ) from unnest(columns) c full outer join unnest(old_columns) oc on (c).name = (oc).name where coalesce((c).is_selectable, (oc).is_selectable) and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) ) ) else '{}'::jsonb end -- Add "old_record" key for update and delete || case when action = 'UPDATE' then jsonb_build_object( 'old_record', ( select jsonb_object_agg((c).name, (c).value) from unnest(old_columns) c where (c).is_selectable and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) ) ) when action = 'DELETE' then jsonb_build_object( 'old_record', ( select jsonb_object_agg((c).name, (c).value) from unnest(old_columns) c where (c).is_selectable and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey ) ) else '{}'::jsonb end; -- Create the prepared statement if is_rls_enabled and action <> 'DELETE' then if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then deallocate walrus_rls_stmt; end if; execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); end if; visible_to_subscription_ids = '{}'; for subscription_id, claims in ( select subs.subscription_id, subs.claims from unnest(subscriptions) subs where subs.entity = entity_ and subs.claims_role = working_role and ( realtime.is_visible_through_filters(columns, subs.filters) or ( action = 'DELETE' and realtime.is_visible_through_filters(old_columns, subs.filters) ) ) ) loop if not is_rls_enabled or action = 'DELETE' then visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; else -- Check if RLS allows the role to see the record perform -- Trim leading and trailing quotes from working_role because set_config -- doesn't recognize the role as valid if they are included set_config('role', trim(both '"' from working_role::text), true), set_config('request.jwt.claims', claims::text, true); execute 'execute walrus_rls_stmt' into subscription_has_access; if subscription_has_access then visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; end if; end if; end loop; perform set_config('role', null, true); return next ( output, is_rls_enabled, visible_to_subscription_ids, case when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] else '{}' end )::realtime.wal_rls; end if; end loop; perform set_config('role', null, true); end; $$; ALTER FUNCTION realtime.apply_rls(wal jsonb, max_record_bytes integer) OWNER TO supabase_admin; -- -- Name: broadcast_changes(text, text, text, text, text, record, record, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text DEFAULT 'ROW'::text) RETURNS void LANGUAGE plpgsql AS $$ DECLARE -- Declare a variable to hold the JSONB representation of the row row_data jsonb := '{}'::jsonb; BEGIN IF level = 'STATEMENT' THEN RAISE EXCEPTION 'function can only be triggered for each row, not for each statement'; END IF; -- Check the operation type and handle accordingly IF operation = 'INSERT' OR operation = 'UPDATE' OR operation = 'DELETE' THEN row_data := jsonb_build_object('old_record', OLD, 'record', NEW, 'operation', operation, 'table', table_name, 'schema', table_schema); PERFORM realtime.send (row_data, event_name, topic_name); ELSE RAISE EXCEPTION 'Unexpected operation type: %', operation; END IF; EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION 'Failed to process the row: %', SQLERRM; END; $$; ALTER FUNCTION realtime.broadcast_changes(topic_name text, event_name text, operation text, table_name text, table_schema text, new record, old record, level text) OWNER TO supabase_admin; -- -- Name: build_prepared_statement_sql(text, regclass, realtime.wal_column[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) RETURNS text LANGUAGE sql AS $$ /* Builds a sql string that, if executed, creates a prepared statement to tests retrive a row from *entity* by its primary key columns. Example select realtime.build_prepared_statement_sql('public.notes', '{"id"}'::text[], '{"bigint"}'::text[]) */ select 'prepare ' || prepared_statement_name || ' as select exists( select 1 from ' || entity || ' where ' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || ' )' from unnest(columns) pkc where pkc.is_pkey group by entity $$; ALTER FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[]) OWNER TO supabase_admin; -- -- Name: cast(text, regtype); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime."cast"(val text, type_ regtype) RETURNS jsonb LANGUAGE plpgsql IMMUTABLE AS $$ declare res jsonb; begin execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res; return res; end $$; ALTER FUNCTION realtime."cast"(val text, type_ regtype) OWNER TO supabase_admin; -- -- Name: check_equality_op(realtime.equality_op, regtype, text, text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) RETURNS boolean LANGUAGE plpgsql IMMUTABLE AS $$ /* Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness */ declare op_symbol text = ( case when op = 'eq' then '=' when op = 'neq' then '!=' when op = 'lt' then '<' when op = 'lte' then '<=' when op = 'gt' then '>' when op = 'gte' then '>=' when op = 'in' then '= any' else 'UNKNOWN OP' end ); res boolean; begin execute format( 'select %L::'|| type_::text || ' ' || op_symbol || ' ( %L::' || ( case when op = 'in' then type_::text || '[]' else type_::text end ) || ')', val_1, val_2) into res; return res; end; $$; ALTER FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text) OWNER TO supabase_admin; -- -- Name: is_visible_through_filters(realtime.wal_column[], realtime.user_defined_filter[]); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) RETURNS boolean LANGUAGE sql IMMUTABLE AS $_$ /* Should the record be visible (true) or filtered out (false) after *filters* are applied */ select -- Default to allowed when no filters present $2 is null -- no filters. this should not happen because subscriptions has a default or array_length($2, 1) is null -- array length of an empty array is null or bool_and( coalesce( realtime.check_equality_op( op:=f.op, type_:=coalesce( col.type_oid::regtype, -- null when wal2json version <= 2.4 col.type_name::regtype ), -- cast jsonb to text val_1:=col.value #>> '{}', val_2:=f.value ), false -- if null, filter does not match ) ) from unnest(filters) f join unnest(columns) col on f.column_name = col.name; $_$; ALTER FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[]) OWNER TO supabase_admin; -- -- Name: list_changes(name, name, integer, integer); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) RETURNS SETOF realtime.wal_rls LANGUAGE sql SET log_min_messages TO 'fatal' AS $$ with pub as ( select concat_ws( ',', case when bool_or(pubinsert) then 'insert' else null end, case when bool_or(pubupdate) then 'update' else null end, case when bool_or(pubdelete) then 'delete' else null end ) as w2j_actions, coalesce( string_agg( realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), ',' ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), '' ) w2j_add_tables from pg_publication pp left join pg_publication_tables ppt on pp.pubname = ppt.pubname where pp.pubname = publication group by pp.pubname limit 1 ), w2j as ( select x.*, pub.w2j_add_tables from pub, pg_logical_slot_get_changes( slot_name, null, max_changes, 'include-pk', 'true', 'include-transaction', 'false', 'include-timestamp', 'true', 'include-type-oids', 'true', 'format-version', '2', 'actions', pub.w2j_actions, 'add-tables', pub.w2j_add_tables ) x ) select xyz.wal, xyz.is_rls_enabled, xyz.subscription_ids, xyz.errors from w2j, realtime.apply_rls( wal := w2j.data::jsonb, max_record_bytes := max_record_bytes ) xyz(wal, is_rls_enabled, subscription_ids, errors) where w2j.w2j_add_tables <> '' and xyz.subscription_ids[1] is not null $$; ALTER FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer) OWNER TO supabase_admin; -- -- Name: quote_wal2json(regclass); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.quote_wal2json(entity regclass) RETURNS text LANGUAGE sql IMMUTABLE STRICT AS $$ select ( select string_agg('' || ch,'') from unnest(string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx) where not (x.idx = 1 and x.ch = '"') and not ( x.idx = array_length(string_to_array(nsp.nspname::text, null), 1) and x.ch = '"' ) ) || '.' || ( select string_agg('' || ch,'') from unnest(string_to_array(pc.relname::text, null)) with ordinality x(ch, idx) where not (x.idx = 1 and x.ch = '"') and not ( x.idx = array_length(string_to_array(nsp.nspname::text, null), 1) and x.ch = '"' ) ) from pg_class pc join pg_namespace nsp on pc.relnamespace = nsp.oid where pc.oid = entity $$; ALTER FUNCTION realtime.quote_wal2json(entity regclass) OWNER TO supabase_admin; -- -- Name: send(jsonb, text, text, boolean); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true) RETURNS void LANGUAGE plpgsql AS $$ DECLARE generated_id uuid; final_payload jsonb; BEGIN BEGIN -- Generate a new UUID for the id generated_id := gen_random_uuid(); -- Check if payload has an 'id' key, if not, add the generated UUID IF payload ? 'id' THEN final_payload := payload; ELSE final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id)); END IF; -- Set the topic configuration EXECUTE format('SET LOCAL realtime.topic TO %L', topic); -- Attempt to insert the message INSERT INTO realtime.messages (id, payload, event, topic, private, extension) VALUES (generated_id, final_payload, event, topic, private, 'broadcast'); EXCEPTION WHEN OTHERS THEN -- Capture and notify the error RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM; END; END; $$; ALTER FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean) OWNER TO supabase_admin; -- -- Name: subscription_check_filters(); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.subscription_check_filters() RETURNS trigger LANGUAGE plpgsql AS $$ /* Validates that the user defined filters for a subscription: - refer to valid columns that the claimed role may access - values are coercable to the correct column type */ declare col_names text[] = coalesce( array_agg(c.column_name order by c.ordinal_position), '{}'::text[] ) from information_schema.columns c where format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity and pg_catalog.has_column_privilege( (new.claims ->> 'role'), format('%I.%I', c.table_schema, c.table_name)::regclass, c.column_name, 'SELECT' ); filter realtime.user_defined_filter; col_type regtype; in_val jsonb; begin for filter in select * from unnest(new.filters) loop -- Filtered column is valid if not filter.column_name = any(col_names) then raise exception 'invalid column for filter %', filter.column_name; end if; -- Type is sanitized and safe for string interpolation col_type = ( select atttypid::regtype from pg_catalog.pg_attribute where attrelid = new.entity and attname = filter.column_name ); if col_type is null then raise exception 'failed to lookup type for column %', filter.column_name; end if; -- Set maximum number of entries for in filter if filter.op = 'in'::realtime.equality_op then in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); if coalesce(jsonb_array_length(in_val), 0) > 100 then raise exception 'too many values for `in` filter. Maximum 100'; end if; else -- raises an exception if value is not coercable to type perform realtime.cast(filter.value, col_type); end if; end loop; -- Apply consistent order to filters so the unique constraint on -- (subscription_id, entity, filters) can't be tricked by a different filter order new.filters = coalesce( array_agg(f order by f.column_name, f.op, f.value), '{}' ) from unnest(new.filters) f; return new; end; $$; ALTER FUNCTION realtime.subscription_check_filters() OWNER TO supabase_admin; -- -- Name: to_regrole(text); Type: FUNCTION; Schema: realtime; Owner: supabase_admin -- CREATE FUNCTION realtime.to_regrole(role_name text) RETURNS regrole LANGUAGE sql IMMUTABLE AS $$ select role_name::regrole $$; ALTER FUNCTION realtime.to_regrole(role_name text) OWNER TO supabase_admin; -- -- Name: topic(); Type: FUNCTION; Schema: realtime; Owner: supabase_realtime_admin -- CREATE FUNCTION realtime.topic() RETURNS text LANGUAGE sql STABLE AS $$ select nullif(current_setting('realtime.topic', true), '')::text; $$; ALTER FUNCTION realtime.topic() OWNER TO supabase_realtime_admin; -- -- Name: can_insert_object(text, text, uuid, jsonb); Type: FUNCTION; Schema: storage; Owner: supabase_storage_admin --