Files
agenciapsilmno/database-novo/schema/03_functions/storage.sql
T
Leonardo 7c20b518d4 Sessoes 1-6 acumuladas: hardening B2, defesa em camadas, +192 testes
Repositorio estava ha ~5 sessoes sem commit. Consolida tudo desde d088a89.

Ver commit.md na raiz para descricao completa por sessao.

# Numeros
- A# auditoria abertos: 0/30
- V# verificacoes abertos: 5/52 (todos adiados com plano)
- T# testes escritos: 10/10
- Vitest: 192/192
- SQL integration: 33/33
- E2E (Playwright, novo): 5/5
- Migrations: 17 (10 novas Sessao 6)
- Areas auditadas: 7 (+documentos com 10 V#)

# Highlights Sessao 6 (hoje)
- V#34/V#41 Opcao B2: tenant_features com plano + override (RPC SECURITY DEFINER, tela /saas/tenant-features)
- A#20 rev2 self-hosted: defesa em 5 camadas (honeypot + rate limit + math captcha condicional + paranoid mode + dashboard /saas/security)
- Documentos hardening (V#43-V#49): tenant scoping em storage policies (vazamento entre clinicas eliminado), RPC validate_share_token, signatures policy granular
- SaaS Twilio Config (/saas/twilio-config): UI editavel para SID/webhook/cotacao; AUTH_TOKEN permanece em env var
- T#9 + T#10: useAgendaEvents.spec.js + Playwright E2E (descobriu bug no front que foi corrigido)

# Sessoes anteriores (1-5) consolidadas
- Sessao 1: auth/router/session, normalizeRole extraido
- Sessao 2: agenda - composables/services consolidados
- Sessao 3: pacientes - tenant_id em todas queries
- Sessao 4: security review pagina publica - 14/15 vulnerabilidades corrigidas
- Sessao 5: SaaS - P0 (A#30: 7 tabelas com RLS off corrigidas)

# .gitignore ajustado
- supabase/* + !supabase/functions/ (mantem 10 edge functions, ignora .temp/migrations gerados pelo CLI)
- database-novo/backups/ (regeneravel via db.cjs backup)
- test-results/ + playwright-report/
- .claude/settings.local.json (config local com senha de dev removida do tracking)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:42:46 -03:00

772 lines
30 KiB
PL/PgSQL

-- Functions: storage
-- Gerado automaticamente em 2026-04-17T12:23:05.224Z
-- Total: 15
CREATE FUNCTION storage.can_insert_object(bucketid text, name text, owner uuid, metadata jsonb) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO "storage"."objects" ("bucket_id", "name", "owner", "metadata") VALUES (bucketid, name, owner, metadata);
-- hack to rollback the successful insert
RAISE sqlstate 'PT200' using
message = 'ROLLBACK',
detail = 'rollback successful insert';
END
$$;
CREATE FUNCTION storage.enforce_bucket_name_length() RETURNS trigger
LANGUAGE plpgsql
AS $$
begin
if length(new.name) > 100 then
raise exception 'bucket name "%" is too long (% characters). Max is 100.', new.name, length(new.name);
end if;
return new;
end;
$$;
CREATE FUNCTION storage.extension(name text) RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
_parts text[];
_filename text;
BEGIN
select string_to_array(name, '/') into _parts;
select _parts[array_length(_parts,1)] into _filename;
-- @todo return the last part instead of 2
return reverse(split_part(reverse(_filename), '.', 1));
END
$$;
CREATE FUNCTION storage.filename(name text) RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
_parts text[];
BEGIN
select string_to_array(name, '/') into _parts;
return _parts[array_length(_parts,1)];
END
$$;
CREATE FUNCTION storage.foldername(name text) RETURNS text[]
LANGUAGE plpgsql
AS $$
DECLARE
_parts text[];
BEGIN
select string_to_array(name, '/') into _parts;
return _parts[1:array_length(_parts,1)-1];
END
$$;
CREATE FUNCTION storage.get_common_prefix(p_key text, p_prefix text, p_delimiter text) RETURNS text
LANGUAGE sql IMMUTABLE
AS $$
SELECT CASE
WHEN position(p_delimiter IN substring(p_key FROM length(p_prefix) + 1)) > 0
THEN left(p_key, length(p_prefix) + position(p_delimiter IN substring(p_key FROM length(p_prefix) + 1)))
ELSE NULL
END;
$$;
CREATE FUNCTION storage.get_size_by_bucket() RETURNS TABLE(size bigint, bucket_id text)
LANGUAGE plpgsql
AS $$
BEGIN
return query
select sum((metadata->>'size')::int) as size, obj.bucket_id
from "storage".objects as obj
group by obj.bucket_id;
END
$$;
CREATE FUNCTION storage.list_multipart_uploads_with_delimiter(bucket_id text, prefix_param text, delimiter_param text, max_keys integer DEFAULT 100, next_key_token text DEFAULT ''::text, next_upload_token text DEFAULT ''::text) RETURNS TABLE(key text, id text, created_at timestamp with time zone)
LANGUAGE plpgsql
AS $_$
BEGIN
RETURN QUERY EXECUTE
'SELECT DISTINCT ON(key COLLATE "C") * from (
SELECT
CASE
WHEN position($2 IN substring(key from length($1) + 1)) > 0 THEN
substring(key from 1 for length($1) + position($2 IN substring(key from length($1) + 1)))
ELSE
key
END AS key, id, created_at
FROM
storage.s3_multipart_uploads
WHERE
bucket_id = $5 AND
key ILIKE $1 || ''%'' AND
CASE
WHEN $4 != '''' AND $6 = '''' THEN
CASE
WHEN position($2 IN substring(key from length($1) + 1)) > 0 THEN
substring(key from 1 for length($1) + position($2 IN substring(key from length($1) + 1))) COLLATE "C" > $4
ELSE
key COLLATE "C" > $4
END
ELSE
true
END AND
CASE
WHEN $6 != '''' THEN
id COLLATE "C" > $6
ELSE
true
END
ORDER BY
key COLLATE "C" ASC, created_at ASC) as e order by key COLLATE "C" LIMIT $3'
USING prefix_param, delimiter_param, max_keys, next_key_token, bucket_id, next_upload_token;
END;
$_$;
CREATE FUNCTION storage.list_objects_with_delimiter(_bucket_id text, prefix_param text, delimiter_param text, max_keys integer DEFAULT 100, start_after text DEFAULT ''::text, next_token text DEFAULT ''::text, sort_order text DEFAULT 'asc'::text) RETURNS TABLE(name text, id uuid, metadata jsonb, updated_at timestamp with time zone, created_at timestamp with time zone, last_accessed_at timestamp with time zone)
LANGUAGE plpgsql STABLE
AS $_$
DECLARE
v_peek_name TEXT;
v_current RECORD;
v_common_prefix TEXT;
-- Configuration
v_is_asc BOOLEAN;
v_prefix TEXT;
v_start TEXT;
v_upper_bound TEXT;
v_file_batch_size INT;
-- Seek state
v_next_seek TEXT;
v_count INT := 0;
-- Dynamic SQL for batch query only
v_batch_query TEXT;
BEGIN
-- ========================================================================
-- INITIALIZATION
-- ========================================================================
v_is_asc := lower(coalesce(sort_order, 'asc')) = 'asc';
v_prefix := coalesce(prefix_param, '');
v_start := CASE WHEN coalesce(next_token, '') <> '' THEN next_token ELSE coalesce(start_after, '') END;
v_file_batch_size := LEAST(GREATEST(max_keys * 2, 100), 1000);
-- Calculate upper bound for prefix filtering (bytewise, using COLLATE "C")
IF v_prefix = '' THEN
v_upper_bound := NULL;
ELSIF right(v_prefix, 1) = delimiter_param THEN
v_upper_bound := left(v_prefix, -1) || chr(ascii(delimiter_param) + 1);
ELSE
v_upper_bound := left(v_prefix, -1) || chr(ascii(right(v_prefix, 1)) + 1);
END IF;
-- Build batch query (dynamic SQL - called infrequently, amortized over many rows)
IF v_is_asc THEN
IF v_upper_bound IS NOT NULL THEN
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" >= $2 ' ||
'AND o.name COLLATE "C" < $3 ORDER BY o.name COLLATE "C" ASC LIMIT $4';
ELSE
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" >= $2 ' ||
'ORDER BY o.name COLLATE "C" ASC LIMIT $4';
END IF;
ELSE
IF v_upper_bound IS NOT NULL THEN
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" < $2 ' ||
'AND o.name COLLATE "C" >= $3 ORDER BY o.name COLLATE "C" DESC LIMIT $4';
ELSE
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND o.name COLLATE "C" < $2 ' ||
'ORDER BY o.name COLLATE "C" DESC LIMIT $4';
END IF;
END IF;
-- ========================================================================
-- SEEK INITIALIZATION: Determine starting position
-- ========================================================================
IF v_start = '' THEN
IF v_is_asc THEN
v_next_seek := v_prefix;
ELSE
-- DESC without cursor: find the last item in range
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_next_seek FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_prefix AND o.name COLLATE "C" < v_upper_bound
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
ELSIF v_prefix <> '' THEN
SELECT o.name INTO v_next_seek FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_prefix
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
ELSE
SELECT o.name INTO v_next_seek FROM storage.objects o
WHERE o.bucket_id = _bucket_id
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
END IF;
IF v_next_seek IS NOT NULL THEN
v_next_seek := v_next_seek || delimiter_param;
ELSE
RETURN;
END IF;
END IF;
ELSE
-- Cursor provided: determine if it refers to a folder or leaf
IF EXISTS (
SELECT 1 FROM storage.objects o
WHERE o.bucket_id = _bucket_id
AND o.name COLLATE "C" LIKE v_start || delimiter_param || '%'
LIMIT 1
) THEN
-- Cursor refers to a folder
IF v_is_asc THEN
v_next_seek := v_start || chr(ascii(delimiter_param) + 1);
ELSE
v_next_seek := v_start || delimiter_param;
END IF;
ELSE
-- Cursor refers to a leaf object
IF v_is_asc THEN
v_next_seek := v_start || delimiter_param;
ELSE
v_next_seek := v_start;
END IF;
END IF;
END IF;
-- ========================================================================
-- MAIN LOOP: Hybrid peek-then-batch algorithm
-- Uses STATIC SQL for peek (hot path) and DYNAMIC SQL for batch
-- ========================================================================
LOOP
EXIT WHEN v_count >= max_keys;
-- STEP 1: PEEK using STATIC SQL (plan cached, very fast)
IF v_is_asc THEN
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_next_seek AND o.name COLLATE "C" < v_upper_bound
ORDER BY o.name COLLATE "C" ASC LIMIT 1;
ELSE
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" >= v_next_seek
ORDER BY o.name COLLATE "C" ASC LIMIT 1;
END IF;
ELSE
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek AND o.name COLLATE "C" >= v_prefix
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
ELSIF v_prefix <> '' THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek AND o.name COLLATE "C" >= v_prefix
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
ELSE
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = _bucket_id AND o.name COLLATE "C" < v_next_seek
ORDER BY o.name COLLATE "C" DESC LIMIT 1;
END IF;
END IF;
EXIT WHEN v_peek_name IS NULL;
-- STEP 2: Check if this is a FOLDER or FILE
v_common_prefix := storage.get_common_prefix(v_peek_name, v_prefix, delimiter_param);
IF v_common_prefix IS NOT NULL THEN
-- FOLDER: Emit and skip to next folder (no heap access needed)
name := rtrim(v_common_prefix, delimiter_param);
id := NULL;
updated_at := NULL;
created_at := NULL;
last_accessed_at := NULL;
metadata := NULL;
RETURN NEXT;
v_count := v_count + 1;
-- Advance seek past the folder range
IF v_is_asc THEN
v_next_seek := left(v_common_prefix, -1) || chr(ascii(delimiter_param) + 1);
ELSE
v_next_seek := v_common_prefix;
END IF;
ELSE
-- FILE: Batch fetch using DYNAMIC SQL (overhead amortized over many rows)
-- For ASC: upper_bound is the exclusive upper limit (< condition)
-- For DESC: prefix is the inclusive lower limit (>= condition)
FOR v_current IN EXECUTE v_batch_query USING _bucket_id, v_next_seek,
CASE WHEN v_is_asc THEN COALESCE(v_upper_bound, v_prefix) ELSE v_prefix END, v_file_batch_size
LOOP
v_common_prefix := storage.get_common_prefix(v_current.name, v_prefix, delimiter_param);
IF v_common_prefix IS NOT NULL THEN
-- Hit a folder: exit batch, let peek handle it
v_next_seek := v_current.name;
EXIT;
END IF;
-- Emit file
name := v_current.name;
id := v_current.id;
updated_at := v_current.updated_at;
created_at := v_current.created_at;
last_accessed_at := v_current.last_accessed_at;
metadata := v_current.metadata;
RETURN NEXT;
v_count := v_count + 1;
-- Advance seek past this file
IF v_is_asc THEN
v_next_seek := v_current.name || delimiter_param;
ELSE
v_next_seek := v_current.name;
END IF;
EXIT WHEN v_count >= max_keys;
END LOOP;
END IF;
END LOOP;
END;
$_$;
CREATE FUNCTION storage.operation() RETURNS text
LANGUAGE plpgsql STABLE
AS $$
BEGIN
RETURN current_setting('storage.operation', true);
END;
$$;
CREATE FUNCTION storage.protect_delete() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
-- Check if storage.allow_delete_query is set to 'true'
IF COALESCE(current_setting('storage.allow_delete_query', true), 'false') != 'true' THEN
RAISE EXCEPTION 'Direct deletion from storage tables is not allowed. Use the Storage API instead.'
USING HINT = 'This prevents accidental data loss from orphaned objects.',
ERRCODE = '42501';
END IF;
RETURN NULL;
END;
$$;
CREATE FUNCTION storage.search(prefix text, bucketname text, limits integer DEFAULT 100, levels integer DEFAULT 1, offsets integer DEFAULT 0, search text DEFAULT ''::text, sortcolumn text DEFAULT 'name'::text, sortorder text DEFAULT 'asc'::text) RETURNS TABLE(name text, id uuid, updated_at timestamp with time zone, created_at timestamp with time zone, last_accessed_at timestamp with time zone, metadata jsonb)
LANGUAGE plpgsql STABLE
AS $_$
DECLARE
v_peek_name TEXT;
v_current RECORD;
v_common_prefix TEXT;
v_delimiter CONSTANT TEXT := '/';
-- Configuration
v_limit INT;
v_prefix TEXT;
v_prefix_lower TEXT;
v_is_asc BOOLEAN;
v_order_by TEXT;
v_sort_order TEXT;
v_upper_bound TEXT;
v_file_batch_size INT;
-- Dynamic SQL for batch query only
v_batch_query TEXT;
-- Seek state
v_next_seek TEXT;
v_count INT := 0;
v_skipped INT := 0;
BEGIN
-- ========================================================================
-- INITIALIZATION
-- ========================================================================
v_limit := LEAST(coalesce(limits, 100), 1500);
v_prefix := coalesce(prefix, '') || coalesce(search, '');
v_prefix_lower := lower(v_prefix);
v_is_asc := lower(coalesce(sortorder, 'asc')) = 'asc';
v_file_batch_size := LEAST(GREATEST(v_limit * 2, 100), 1000);
-- Validate sort column
CASE lower(coalesce(sortcolumn, 'name'))
WHEN 'name' THEN v_order_by := 'name';
WHEN 'updated_at' THEN v_order_by := 'updated_at';
WHEN 'created_at' THEN v_order_by := 'created_at';
WHEN 'last_accessed_at' THEN v_order_by := 'last_accessed_at';
ELSE v_order_by := 'name';
END CASE;
v_sort_order := CASE WHEN v_is_asc THEN 'asc' ELSE 'desc' END;
-- ========================================================================
-- NON-NAME SORTING: Use path_tokens approach (unchanged)
-- ========================================================================
IF v_order_by != 'name' THEN
RETURN QUERY EXECUTE format(
$sql$
WITH folders AS (
SELECT path_tokens[$1] AS folder
FROM storage.objects
WHERE objects.name ILIKE $2 || '%%'
AND bucket_id = $3
AND array_length(objects.path_tokens, 1) <> $1
GROUP BY folder
ORDER BY folder %s
)
(SELECT folder AS "name",
NULL::uuid AS id,
NULL::timestamptz AS updated_at,
NULL::timestamptz AS created_at,
NULL::timestamptz AS last_accessed_at,
NULL::jsonb AS metadata FROM folders)
UNION ALL
(SELECT path_tokens[$1] AS "name",
id, updated_at, created_at, last_accessed_at, metadata
FROM storage.objects
WHERE objects.name ILIKE $2 || '%%'
AND bucket_id = $3
AND array_length(objects.path_tokens, 1) = $1
ORDER BY %I %s)
LIMIT $4 OFFSET $5
$sql$, v_sort_order, v_order_by, v_sort_order
) USING levels, v_prefix, bucketname, v_limit, offsets;
RETURN;
END IF;
-- ========================================================================
-- NAME SORTING: Hybrid skip-scan with batch optimization
-- ========================================================================
-- Calculate upper bound for prefix filtering
IF v_prefix_lower = '' THEN
v_upper_bound := NULL;
ELSIF right(v_prefix_lower, 1) = v_delimiter THEN
v_upper_bound := left(v_prefix_lower, -1) || chr(ascii(v_delimiter) + 1);
ELSE
v_upper_bound := left(v_prefix_lower, -1) || chr(ascii(right(v_prefix_lower, 1)) + 1);
END IF;
-- Build batch query (dynamic SQL - called infrequently, amortized over many rows)
IF v_is_asc THEN
IF v_upper_bound IS NOT NULL THEN
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" >= $2 ' ||
'AND lower(o.name) COLLATE "C" < $3 ORDER BY lower(o.name) COLLATE "C" ASC LIMIT $4';
ELSE
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" >= $2 ' ||
'ORDER BY lower(o.name) COLLATE "C" ASC LIMIT $4';
END IF;
ELSE
IF v_upper_bound IS NOT NULL THEN
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" < $2 ' ||
'AND lower(o.name) COLLATE "C" >= $3 ORDER BY lower(o.name) COLLATE "C" DESC LIMIT $4';
ELSE
v_batch_query := 'SELECT o.name, o.id, o.updated_at, o.created_at, o.last_accessed_at, o.metadata ' ||
'FROM storage.objects o WHERE o.bucket_id = $1 AND lower(o.name) COLLATE "C" < $2 ' ||
'ORDER BY lower(o.name) COLLATE "C" DESC LIMIT $4';
END IF;
END IF;
-- Initialize seek position
IF v_is_asc THEN
v_next_seek := v_prefix_lower;
ELSE
-- DESC: find the last item in range first (static SQL)
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_prefix_lower AND lower(o.name) COLLATE "C" < v_upper_bound
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
ELSIF v_prefix_lower <> '' THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_prefix_lower
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
ELSE
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
END IF;
IF v_peek_name IS NOT NULL THEN
v_next_seek := lower(v_peek_name) || v_delimiter;
ELSE
RETURN;
END IF;
END IF;
-- ========================================================================
-- MAIN LOOP: Hybrid peek-then-batch algorithm
-- Uses STATIC SQL for peek (hot path) and DYNAMIC SQL for batch
-- ========================================================================
LOOP
EXIT WHEN v_count >= v_limit;
-- STEP 1: PEEK using STATIC SQL (plan cached, very fast)
IF v_is_asc THEN
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_next_seek AND lower(o.name) COLLATE "C" < v_upper_bound
ORDER BY lower(o.name) COLLATE "C" ASC LIMIT 1;
ELSE
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" >= v_next_seek
ORDER BY lower(o.name) COLLATE "C" ASC LIMIT 1;
END IF;
ELSE
IF v_upper_bound IS NOT NULL THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek AND lower(o.name) COLLATE "C" >= v_prefix_lower
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
ELSIF v_prefix_lower <> '' THEN
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek AND lower(o.name) COLLATE "C" >= v_prefix_lower
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
ELSE
SELECT o.name INTO v_peek_name FROM storage.objects o
WHERE o.bucket_id = bucketname AND lower(o.name) COLLATE "C" < v_next_seek
ORDER BY lower(o.name) COLLATE "C" DESC LIMIT 1;
END IF;
END IF;
EXIT WHEN v_peek_name IS NULL;
-- STEP 2: Check if this is a FOLDER or FILE
v_common_prefix := storage.get_common_prefix(lower(v_peek_name), v_prefix_lower, v_delimiter);
IF v_common_prefix IS NOT NULL THEN
-- FOLDER: Handle offset, emit if needed, skip to next folder
IF v_skipped < offsets THEN
v_skipped := v_skipped + 1;
ELSE
name := split_part(rtrim(v_common_prefix, v_delimiter), v_delimiter, levels);
id := NULL;
updated_at := NULL;
created_at := NULL;
last_accessed_at := NULL;
metadata := NULL;
RETURN NEXT;
v_count := v_count + 1;
END IF;
-- Advance seek past the folder range
IF v_is_asc THEN
v_next_seek := lower(left(v_common_prefix, -1)) || chr(ascii(v_delimiter) + 1);
ELSE
v_next_seek := lower(v_common_prefix);
END IF;
ELSE
-- FILE: Batch fetch using DYNAMIC SQL (overhead amortized over many rows)
-- For ASC: upper_bound is the exclusive upper limit (< condition)
-- For DESC: prefix_lower is the inclusive lower limit (>= condition)
FOR v_current IN EXECUTE v_batch_query
USING bucketname, v_next_seek,
CASE WHEN v_is_asc THEN COALESCE(v_upper_bound, v_prefix_lower) ELSE v_prefix_lower END, v_file_batch_size
LOOP
v_common_prefix := storage.get_common_prefix(lower(v_current.name), v_prefix_lower, v_delimiter);
IF v_common_prefix IS NOT NULL THEN
-- Hit a folder: exit batch, let peek handle it
v_next_seek := lower(v_current.name);
EXIT;
END IF;
-- Handle offset skipping
IF v_skipped < offsets THEN
v_skipped := v_skipped + 1;
ELSE
-- Emit file
name := split_part(v_current.name, v_delimiter, levels);
id := v_current.id;
updated_at := v_current.updated_at;
created_at := v_current.created_at;
last_accessed_at := v_current.last_accessed_at;
metadata := v_current.metadata;
RETURN NEXT;
v_count := v_count + 1;
END IF;
-- Advance seek past this file
IF v_is_asc THEN
v_next_seek := lower(v_current.name) || v_delimiter;
ELSE
v_next_seek := lower(v_current.name);
END IF;
EXIT WHEN v_count >= v_limit;
END LOOP;
END IF;
END LOOP;
END;
$_$;
CREATE FUNCTION storage.search_by_timestamp(p_prefix text, p_bucket_id text, p_limit integer, p_level integer, p_start_after text, p_sort_order text, p_sort_column text, p_sort_column_after text) RETURNS TABLE(key text, name text, id uuid, updated_at timestamp with time zone, created_at timestamp with time zone, last_accessed_at timestamp with time zone, metadata jsonb)
LANGUAGE plpgsql STABLE
AS $_$
DECLARE
v_cursor_op text;
v_query text;
v_prefix text;
BEGIN
v_prefix := coalesce(p_prefix, '');
IF p_sort_order = 'asc' THEN
v_cursor_op := '>';
ELSE
v_cursor_op := '<';
END IF;
v_query := format($sql$
WITH raw_objects AS (
SELECT
o.name AS obj_name,
o.id AS obj_id,
o.updated_at AS obj_updated_at,
o.created_at AS obj_created_at,
o.last_accessed_at AS obj_last_accessed_at,
o.metadata AS obj_metadata,
storage.get_common_prefix(o.name, $1, '/') AS common_prefix
FROM storage.objects o
WHERE o.bucket_id = $2
AND o.name COLLATE "C" LIKE $1 || '%%'
),
-- Aggregate common prefixes (folders)
-- Both created_at and updated_at use MIN(obj_created_at) to match the old prefixes table behavior
aggregated_prefixes AS (
SELECT
rtrim(common_prefix, '/') AS name,
NULL::uuid AS id,
MIN(obj_created_at) AS updated_at,
MIN(obj_created_at) AS created_at,
NULL::timestamptz AS last_accessed_at,
NULL::jsonb AS metadata,
TRUE AS is_prefix
FROM raw_objects
WHERE common_prefix IS NOT NULL
GROUP BY common_prefix
),
leaf_objects AS (
SELECT
obj_name AS name,
obj_id AS id,
obj_updated_at AS updated_at,
obj_created_at AS created_at,
obj_last_accessed_at AS last_accessed_at,
obj_metadata AS metadata,
FALSE AS is_prefix
FROM raw_objects
WHERE common_prefix IS NULL
),
combined AS (
SELECT * FROM aggregated_prefixes
UNION ALL
SELECT * FROM leaf_objects
),
filtered AS (
SELECT *
FROM combined
WHERE (
$5 = ''
OR ROW(
date_trunc('milliseconds', %I),
name COLLATE "C"
) %s ROW(
COALESCE(NULLIF($6, '')::timestamptz, 'epoch'::timestamptz),
$5
)
)
)
SELECT
split_part(name, '/', $3) AS key,
name,
id,
updated_at,
created_at,
last_accessed_at,
metadata
FROM filtered
ORDER BY
COALESCE(date_trunc('milliseconds', %I), 'epoch'::timestamptz) %s,
name COLLATE "C" %s
LIMIT $4
$sql$,
p_sort_column,
v_cursor_op,
p_sort_column,
p_sort_order,
p_sort_order
);
RETURN QUERY EXECUTE v_query
USING v_prefix, p_bucket_id, p_level, p_limit, p_start_after, p_sort_column_after;
END;
$_$;
CREATE FUNCTION storage.search_v2(prefix text, bucket_name text, limits integer DEFAULT 100, levels integer DEFAULT 1, start_after text DEFAULT ''::text, sort_order text DEFAULT 'asc'::text, sort_column text DEFAULT 'name'::text, sort_column_after text DEFAULT ''::text) RETURNS TABLE(key text, name text, id uuid, updated_at timestamp with time zone, created_at timestamp with time zone, last_accessed_at timestamp with time zone, metadata jsonb)
LANGUAGE plpgsql STABLE
AS $$
DECLARE
v_sort_col text;
v_sort_ord text;
v_limit int;
BEGIN
-- Cap limit to maximum of 1500 records
v_limit := LEAST(coalesce(limits, 100), 1500);
-- Validate and normalize sort_order
v_sort_ord := lower(coalesce(sort_order, 'asc'));
IF v_sort_ord NOT IN ('asc', 'desc') THEN
v_sort_ord := 'asc';
END IF;
-- Validate and normalize sort_column
v_sort_col := lower(coalesce(sort_column, 'name'));
IF v_sort_col NOT IN ('name', 'updated_at', 'created_at') THEN
v_sort_col := 'name';
END IF;
-- Route to appropriate implementation
IF v_sort_col = 'name' THEN
-- Use list_objects_with_delimiter for name sorting (most efficient: O(k * log n))
RETURN QUERY
SELECT
split_part(l.name, '/', levels) AS key,
l.name AS name,
l.id,
l.updated_at,
l.created_at,
l.last_accessed_at,
l.metadata
FROM storage.list_objects_with_delimiter(
bucket_name,
coalesce(prefix, ''),
'/',
v_limit,
start_after,
'',
v_sort_ord
) l;
ELSE
-- Use aggregation approach for timestamp sorting
-- Not efficient for large datasets but supports correct pagination
RETURN QUERY SELECT * FROM storage.search_by_timestamp(
prefix, bucket_name, v_limit, levels, start_after,
v_sort_ord, v_sort_col, sort_column_after
);
END IF;
END;
$$;
CREATE FUNCTION storage.update_updated_at_column() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$;