-- ============================================================================= -- F6.2 Lote E — RPCs de cron/global roteadas/loopadas por tenant -- -- ⚠️ APLICAR COMO supabase_admin. -- docker exec -i -e PGPASSWORD=postgres supabase_db_agenciapsi-primesakai \ -- psql -U supabase_admin -h 127.0.0.1 -d postgres -v ON_ERROR_STOP=1 \ -- < database-novo/manual/f6_2e_cron_rpcs.supabase_admin.sql -- -- E1: chamadas per-tenant pelas edge crons → p_tenant_id + set_config search_path -- (helper public._tenant_route do Lote D). Edge ajustada (admin.rpc + p_tenant_id). -- E2: crons sem-arg que varrem TODOS os tenants → loop FROM tenant_schemas. -- ============================================================================= BEGIN; -- helper SEM checagem de auth: resolve schema pra RPCs de SERVIÇO (chamadas por -- service_role/edge, que não é tenant_member). Protegido por REVOKE das RPCs de -- anon/authenticated (só service_role/postgres chamam). CREATE OR REPLACE FUNCTION public._tenant_schema_unchecked(p_tenant_id uuid) RETURNS text LANGUAGE plpgsql STABLE SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_schema text; BEGIN IF p_tenant_id IS NULL THEN RAISE EXCEPTION 'p_tenant_id obrigatório'; END IF; v_schema := public.tenant_schema_for(p_tenant_id); IF v_schema IS NULL THEN RAISE EXCEPTION 'schema não encontrado p/ tenant %', p_tenant_id; END IF; RETURN v_schema; END $$; -- ─────────────────────────────────────────────────────────────────────────── -- E2 — crons globais: varrem todos os schemas -- ─────────────────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION public.cleanup_notification_queue() RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE t record; v_n int; v_total int := 0; BEGIN FOR t IN SELECT schema_name FROM public.tenant_schemas LOOP EXECUTE format('DELETE FROM %I.notification_queue WHERE status IN (''enviado'',''cancelado'',''ignorado'') AND created_at < now() - interval ''90 days''', t.schema_name); GET DIAGNOSTICS v_n = ROW_COUNT; v_total := v_total + v_n; END LOOP; RETURN v_total; END $$; CREATE OR REPLACE FUNCTION public.unstick_notification_queue() RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE t record; v_n int; v_total int := 0; BEGIN FOR t IN SELECT schema_name FROM public.tenant_schemas LOOP EXECUTE format('UPDATE %I.notification_queue SET status=''pendente'', attempts=attempts+1, last_error=''Timeout: preso em processando por >10min'', next_retry_at=now()+interval ''2 minutes'' WHERE status=''processando'' AND updated_at < now() - interval ''10 minutes''', t.schema_name); GET DIAGNOSTICS v_n = ROW_COUNT; v_total := v_total + v_n; END LOOP; RETURN v_total; END $$; CREATE OR REPLACE FUNCTION public.sync_overdue_financial_records() RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE t record; v_n int; v_total int := 0; BEGIN FOR t IN SELECT schema_name FROM public.tenant_schemas LOOP EXECUTE format('UPDATE %I.financial_records SET status=''overdue'', updated_at=now() WHERE status=''pending'' AND due_date IS NOT NULL AND due_date < CURRENT_DATE AND deleted_at IS NULL', t.schema_name); GET DIAGNOSTICS v_n = ROW_COUNT; v_total := v_total + v_n; END LOOP; RETURN v_total; END $$; -- populate: complexo (multi-tabela). set_config search_path por tenant; profiles -- é GLOBAL → qualificado. Remove tenant_id do INSERT e do SELECT. CREATE OR REPLACE FUNCTION public.populate_notification_queue() RETURNS void LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE t record; BEGIN FOR t IN SELECT schema_name FROM public.tenant_schemas LOOP PERFORM set_config('search_path', t.schema_name || ',public,pg_temp', true); INSERT INTO notification_queue ( owner_id, agenda_evento_id, patient_id, channel, template_key, schedule_key, resolved_vars, recipient_address, scheduled_at, idempotency_key) SELECT ae.owner_id, ae.id, ae.patient_id, ch.channel, 'session.' || REPLACE(ns.event_type, '_sessao', '') || '.' || ch.channel, ns.schedule_key, jsonb_build_object('nome_paciente', COALESCE(p.nome_completo,'Paciente'), 'data_sessao', TO_CHAR(ae.inicio_em AT TIME ZONE 'America/Sao_Paulo','DD/MM/YYYY'), 'hora_sessao', TO_CHAR(ae.inicio_em AT TIME ZONE 'America/Sao_Paulo','HH24:MI'), 'nome_terapeuta', COALESCE(prof.full_name,'Terapeuta'), 'modalidade', COALESCE(ae.modalidade,'Presencial'), 'titulo', COALESCE(ae.titulo,'Sessão')), CASE ch.channel WHEN 'whatsapp' THEN COALESCE(p.telefone,'') WHEN 'sms' THEN COALESCE(p.telefone,'') WHEN 'email' THEN COALESCE(p.email_principal,'') END, CASE WHEN (ae.inicio_em - (ns.offset_minutes||' minutes')::interval)::time < ns.allowed_time_start THEN DATE_TRUNC('day', ae.inicio_em - (ns.offset_minutes||' minutes')::interval) + ns.allowed_time_start WHEN (ae.inicio_em - (ns.offset_minutes||' minutes')::interval)::time > ns.allowed_time_end THEN DATE_TRUNC('day', ae.inicio_em - (ns.offset_minutes||' minutes')::interval) + ns.allowed_time_start ELSE ae.inicio_em - (ns.offset_minutes||' minutes')::interval END, ae.id::text||':'||ns.schedule_key||':'||ch.channel||':'||ae.inicio_em::date::text FROM agenda_eventos ae JOIN patients p ON p.id = ae.patient_id LEFT JOIN public.profiles prof ON prof.id = ae.owner_id -- GLOBAL JOIN notification_schedules ns ON ns.owner_id = ae.owner_id AND ns.is_active=true AND ns.deleted_at IS NULL AND ns.trigger_type='before_event' AND ns.event_type='lembrete_sessao' JOIN notification_channels nc ON nc.owner_id = ae.owner_id AND nc.is_active=true AND nc.deleted_at IS NULL CROSS JOIN LATERAL ( SELECT 'whatsapp' AS channel WHERE ns.whatsapp_enabled AND nc.channel='whatsapp' UNION ALL SELECT 'email' WHERE ns.email_enabled AND nc.channel='email' UNION ALL SELECT 'sms' WHERE ns.sms_enabled AND nc.channel='sms') ch LEFT JOIN notification_preferences np ON np.patient_id = ae.patient_id AND np.owner_id = ae.owner_id AND np.deleted_at IS NULL WHERE ae.tipo = 'sessao' AND ae.status NOT IN ('cancelado','realizado') AND ae.inicio_em > now() AND (ae.inicio_em - (ns.offset_minutes||' minutes')::interval) > now() ON CONFLICT (idempotency_key) DO NOTHING; END LOOP; END $$; -- ─────────────────────────────────────────────────────────────────────────── -- E1 — chamadas per-tenant (p_tenant_id + route) -- ─────────────────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION public.sla_open_breach(p_tenant_id uuid, p_thread_key text, p_assigned_to uuid, p_last_inbound_at timestamptz, p_threshold_minutes integer) RETURNS uuid LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_existing_id uuid; v_new_id uuid; BEGIN IF p_tenant_id IS NULL OR p_thread_key IS NULL THEN RAISE EXCEPTION 'tenant_and_thread_required'; END IF; PERFORM set_config('search_path', public._tenant_schema_unchecked(p_tenant_id) || ',public,pg_temp', true); SELECT id INTO v_existing_id FROM conversation_sla_breaches WHERE thread_key = p_thread_key AND resolved_at IS NULL; IF FOUND THEN UPDATE conversation_sla_breaches SET assigned_to = COALESCE(p_assigned_to, assigned_to), last_inbound_at = COALESCE(p_last_inbound_at, last_inbound_at) WHERE id = v_existing_id; RETURN v_existing_id; END IF; INSERT INTO conversation_sla_breaches (thread_key, assigned_to, last_inbound_at, threshold_minutes_at_breach) VALUES (p_thread_key, p_assigned_to, p_last_inbound_at, p_threshold_minutes) RETURNING id INTO v_new_id; RETURN v_new_id; END $$; DROP FUNCTION IF EXISTS public.sla_mark_notified(uuid); DROP FUNCTION IF EXISTS public.sla_mark_notified(uuid,uuid); CREATE FUNCTION public.sla_mark_notified(p_tenant_id uuid, p_breach_id uuid) RETURNS void LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ BEGIN PERFORM set_config('search_path', public._tenant_schema_unchecked(p_tenant_id) || ',public,pg_temp', true); UPDATE conversation_sla_breaches SET notified_at = now(), notification_count = notification_count + 1 WHERE id = p_breach_id; END $$; DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_open_incident(uuid, text, text, jsonb); DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_open_incident(uuid, uuid, text, text, jsonb); CREATE FUNCTION public.whatsapp_heartbeat_open_incident(p_tenant_id uuid, p_channel_id uuid, p_kind text, p_last_state text DEFAULT NULL, p_details jsonb DEFAULT NULL) RETURNS uuid LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_provider text; v_existing_id uuid; v_new_id uuid; BEGIN PERFORM set_config('search_path', public._tenant_schema_unchecked(p_tenant_id) || ',public,pg_temp', true); SELECT provider INTO v_provider FROM notification_channels WHERE id = p_channel_id AND deleted_at IS NULL; IF NOT FOUND THEN RAISE EXCEPTION 'channel_not_found'; END IF; IF p_kind NOT IN ('disconnected','error','qr_pending','connecting','unknown') THEN RAISE EXCEPTION 'invalid_kind: %', p_kind; END IF; SELECT id INTO v_existing_id FROM whatsapp_connection_incidents WHERE channel_id = p_channel_id AND resolved_at IS NULL; IF FOUND THEN UPDATE whatsapp_connection_incidents SET last_state = COALESCE(p_last_state, last_state), details = COALESCE(p_details, details), kind = p_kind WHERE id = v_existing_id; RETURN v_existing_id; END IF; INSERT INTO whatsapp_connection_incidents (channel_id, provider, kind, last_state, details) VALUES (p_channel_id, v_provider, p_kind, p_last_state, p_details) RETURNING id INTO v_new_id; RETURN v_new_id; END $$; DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_mark_notified(uuid); DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_mark_notified(uuid,uuid); CREATE FUNCTION public.whatsapp_heartbeat_mark_notified(p_tenant_id uuid, p_incident_id uuid) RETURNS void LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ BEGIN PERFORM set_config('search_path', public._tenant_schema_unchecked(p_tenant_id) || ',public,pg_temp', true); UPDATE whatsapp_connection_incidents SET notified_at = now(), notification_count = notification_count + 1 WHERE id = p_incident_id; END $$; DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_resolve_open_incidents(uuid); DROP FUNCTION IF EXISTS public.whatsapp_heartbeat_resolve_open_incidents(uuid,uuid); CREATE FUNCTION public.whatsapp_heartbeat_resolve_open_incidents(p_tenant_id uuid, p_channel_id uuid) RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_count int := 0; BEGIN PERFORM set_config('search_path', public._tenant_schema_unchecked(p_tenant_id) || ',public,pg_temp', true); UPDATE whatsapp_connection_incidents SET resolved_at = now(), duration_seconds = EXTRACT(EPOCH FROM (now() - started_at))::int WHERE channel_id = p_channel_id AND resolved_at IS NULL; GET DIAGNOSTICS v_count = ROW_COUNT; RETURN v_count; END $$; -- convert_abandoned_intake_to_lead: resolve o tenant INTERNAMENTE (intake.owner_id -- -> tenant_members). patient_intake_requests FICA em public (F1b). Writes de -- conversation_messages/notes vão pro schema do tenant resolvido. CREATE OR REPLACE FUNCTION public.convert_abandoned_intake_to_lead(p_intake_id uuid) RETURNS uuid LANGUAGE plpgsql SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_intake RECORD; v_tenant_id uuid; v_schema text; v_thread_key text; v_phone text; v_note_body text; v_admin_id uuid; BEGIN SELECT * INTO v_intake FROM public.patient_intake_requests WHERE id = p_intake_id; IF NOT FOUND THEN RAISE EXCEPTION 'intake_not_found'; END IF; IF v_intake.status = 'abandoned_lead' THEN RETURN v_intake.lead_thread_key::uuid; END IF; SELECT tenant_id INTO v_tenant_id FROM public.tenant_members WHERE user_id = v_intake.owner_id ORDER BY CASE role WHEN 'tenant_admin' THEN 1 WHEN 'clinic_admin' THEN 2 ELSE 3 END LIMIT 1; IF v_tenant_id IS NULL THEN RAISE EXCEPTION 'tenant_not_resolved'; END IF; v_schema := public.tenant_schema_for(v_tenant_id); IF v_schema IS NULL THEN RAISE EXCEPTION 'schema_not_found'; END IF; v_phone := regexp_replace(COALESCE(v_intake.telefone,''),'\D','','g'); IF length(v_phone) BETWEEN 10 AND 11 THEN v_phone := '55'||v_phone; END IF; IF v_phone = '' THEN v_phone := 'unknown'; END IF; v_thread_key := 'anon:'||v_phone; v_note_body := format('📋 Lead abandonado (cadastro externo):%s%sNome: %s%sTelefone: %s%sE-mail: %s%sMotivo/Observacoes: %s%s%sIniciou em: %s · Ultima atualizacao: %s', E'\n',E'\n', COALESCE(v_intake.nome_completo,'—'), E'\n', COALESCE(v_intake.telefone,'—'), E'\n', COALESCE(v_intake.email_principal,'—'), E'\n', COALESCE(v_intake.onde_nos_conheceu,'—'), E'\n',E'\n', to_char(v_intake.created_at AT TIME ZONE 'America/Sao_Paulo','DD/MM HH24:MI'), to_char(COALESCE(v_intake.last_progress_at, v_intake.updated_at) AT TIME ZONE 'America/Sao_Paulo','DD/MM HH24:MI')); SELECT user_id INTO v_admin_id FROM public.tenant_members WHERE tenant_id = v_tenant_id AND role IN ('tenant_admin','clinic_admin') AND status='active' LIMIT 1; IF v_admin_id IS NULL THEN v_admin_id := v_intake.owner_id; END IF; PERFORM set_config('search_path', v_schema || ',public,pg_temp', true); INSERT INTO conversation_messages (channel, direction, from_number, to_number, body, provider, provider_raw, kanban_status) VALUES ('whatsapp','inbound', CASE WHEN v_phone='unknown' THEN NULL ELSE v_phone END, NULL, format('🧾 Cadastro externo iniciado e não finalizado. %s entrou em contato via link público mas abandonou o formulário — ver nota interna.', COALESCE(v_intake.nome_completo,'Visitante')), 'system', jsonb_build_object('lead_from_abandoned_intake', true, 'intake_id', v_intake.id), 'awaiting_us'); INSERT INTO conversation_notes (thread_key, contact_number, body, created_by) VALUES (v_thread_key, CASE WHEN v_phone='unknown' THEN NULL ELSE v_phone END, v_note_body, v_admin_id); UPDATE public.patient_intake_requests SET status='abandoned_lead', lead_thread_key=v_thread_key, updated_at=now() WHERE id = p_intake_id; RETURN p_intake_id; END $$; -- first_response analytics: routam pelo p_tenant_id (cada função seta o seu próprio -- search_path — _first_response_runs tem SET search_path próprio que resetaria). CREATE OR REPLACE FUNCTION public._first_response_runs(p_tenant_id uuid, p_from timestamptz, p_to timestamptz) RETURNS TABLE(thread_key text, inbound_started_at timestamptz, responded_at timestamptz, response_seconds integer, responder_id uuid) LANGUAGE plpgsql STABLE SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ BEGIN PERFORM set_config('search_path', public._tenant_route(p_tenant_id) || ',public,pg_temp', true); RETURN QUERY WITH base AS ( SELECT COALESCE(m.patient_id::text, 'anon:' || COALESCE(CASE WHEN m.direction='inbound' THEN m.from_number ELSE m.to_number END, 'unknown')) AS thread_key, m.direction, m.created_at FROM conversation_messages m WHERE m.created_at >= p_from AND m.created_at < p_to ), inbound AS ( SELECT b.thread_key AS tk, min(b.created_at) AS inbound_started_at FROM base b WHERE b.direction='inbound' GROUP BY b.thread_key ) SELECT i.tk, i.inbound_started_at, (SELECT min(b2.created_at) FROM base b2 WHERE b2.thread_key = i.tk AND b2.direction='outbound' AND b2.created_at >= i.inbound_started_at) AS responded_at, EXTRACT(EPOCH FROM ((SELECT min(b2.created_at) FROM base b2 WHERE b2.thread_key = i.tk AND b2.direction='outbound' AND b2.created_at >= i.inbound_started_at) - i.inbound_started_at))::int AS response_seconds, a.assigned_to AS responder_id FROM inbound i LEFT JOIN conversation_assignments a ON a.thread_key = i.tk; END $$; CREATE OR REPLACE FUNCTION public.first_response_stats(p_tenant_id uuid, p_from timestamptz DEFAULT (now() - interval '30 days'), p_to timestamptz DEFAULT now(), p_therapist_id uuid DEFAULT NULL) RETURNS TABLE(runs_count integer, avg_seconds integer, median_seconds integer, min_seconds integer, max_seconds integer, sla_threshold_seconds integer, sla_compliant_count integer, sla_compliance_rate numeric) LANGUAGE plpgsql STABLE SECURITY DEFINER SET search_path TO 'public','pg_temp' AS $$ DECLARE v_threshold_min integer; BEGIN PERFORM set_config('search_path', public._tenant_route(p_tenant_id) || ',public,pg_temp', true); SELECT threshold_minutes INTO v_threshold_min FROM conversation_sla_rules LIMIT 1; v_threshold_min := COALESCE(v_threshold_min, 30); RETURN QUERY WITH runs AS ( SELECT r.response_seconds FROM public._first_response_runs(p_tenant_id, p_from, p_to) r WHERE r.responded_at IS NOT NULL AND (p_therapist_id IS NULL OR r.responder_id = p_therapist_id) ) SELECT count(*)::int, COALESCE(avg(response_seconds),0)::int, COALESCE(percentile_cont(0.5) WITHIN GROUP (ORDER BY response_seconds),0)::int, COALESCE(min(response_seconds),0)::int, COALESCE(max(response_seconds),0)::int, (v_threshold_min*60)::int, count(*) FILTER (WHERE response_seconds <= v_threshold_min*60)::int, CASE WHEN count(*)=0 THEN 0 ELSE round(100.0*count(*) FILTER (WHERE response_seconds <= v_threshold_min*60)/count(*),1) END FROM runs; END $$; -- RPCs de serviço (cron/edge): só service_role/postgres. Sem checagem de membership. DO $g$ DECLARE fn text; BEGIN FOREACH fn IN ARRAY ARRAY[ 'sla_open_breach(uuid,text,uuid,timestamptz,integer)', 'sla_mark_notified(uuid,uuid)', 'whatsapp_heartbeat_open_incident(uuid,uuid,text,text,jsonb)', 'whatsapp_heartbeat_mark_notified(uuid,uuid)', 'whatsapp_heartbeat_resolve_open_incidents(uuid,uuid)', 'convert_abandoned_intake_to_lead(uuid)', 'cleanup_notification_queue()','unstick_notification_queue()', 'sync_overdue_financial_records()','populate_notification_queue()' ] LOOP EXECUTE format('REVOKE ALL ON FUNCTION public.%s FROM PUBLIC, anon, authenticated', fn); EXECUTE format('GRANT EXECUTE ON FUNCTION public.%s TO service_role', fn); END LOOP; END $g$; COMMIT;