/* |-------------------------------------------------------------------------- | Agência PSI — Edge Function: process-notification-queue |-------------------------------------------------------------------------- | Processa a notification_queue para channel = 'whatsapp' via Evolution API. | | schema-per-tenant: notification_queue/channels/templates/logs vivem no | schema físico de cada tenant (SEM coluna tenant_id). O cron VARRE todos os | tenants ativos; se vier body.tenant_id, processa só aquele (modo single). | | Fluxo por item: | 1. Busca pendentes (channel='whatsapp', status='pendente', scheduled_at <= now) | 2. Marca como 'processando' (lock otimista) | 3. Busca credenciais Evolution API em notification_channels (tdb) | 4. Resolve template (todos os templates do schema pertencem ao tenant) | 5. Renderiza variáveis {{var}} | 6. Envia via Evolution API (sendText) | 7. Atualiza queue + insere notification_logs (tdb) | 8. Em erro: retry com backoff ou marca 'falhou' |-------------------------------------------------------------------------- */ import type { SupabaseClient } from 'https://esm.sh/@supabase/supabase-js@2' import { adminClient, listTenantSchemas, tenantDbForId, schemaForTenant } from '../_shared/tenant.ts' const corsHeaders = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type', 'Access-Control-Allow-Methods': 'POST, OPTIONS', } // ── Template renderer ────────────────────────────────────────── function renderTemplate(template: string, variables: Record): string { if (!template) return '' return template.replace(/\{\{([\w.]+)\}\}/g, (_, key) => { const val = variables[key] return val !== undefined && val !== null ? String(val) : '' }) } // ── Evolution API sender ─────────────────────────────────────── interface EvolutionCredentials { api_url: string api_key: string instance_name: string } async function sendWhatsapp( credentials: EvolutionCredentials, recipient: string, message: string ): Promise<{ messageId?: string; status: string }> { const url = `${credentials.api_url}/message/sendText/${credentials.instance_name}` const res = await fetch(url, { method: 'POST', headers: { 'apikey': credentials.api_key, 'Content-Type': 'application/json', }, body: JSON.stringify({ number: recipient, text: message, }), }) const data = await res.json() if (!res.ok) { throw new Error(data?.message || `Evolution API error ${res.status}`) } return { messageId: data?.key?.id || data?.messageId || null, status: 'sent', } } // ── Processa a fila de UM tenant ─────────────────────────────── type Result = { id: string; status: string; error?: string } async function processTenantQueue(tdb: SupabaseClient): Promise { const now = new Date().toISOString() // 1. Busca itens pendentes de WhatsApp deste tenant const { data: items, error: fetchErr } = await tdb .from('notification_queue') .select('*') .eq('channel', 'whatsapp') .eq('status', 'pendente') .lte('scheduled_at', now) .lt('attempts', 5) .order('scheduled_at', { ascending: true }) .limit(20) if (fetchErr) throw new Error(fetchErr.message) if (!items?.length) return [] const results: Result[] = [] // Cache de credenciais por owner para evitar queries repetidas const credentialsCache = new Map() for (const item of items) { // 2. Lock otimista const { error: lockErr } = await tdb .from('notification_queue') .update({ status: 'processando', attempts: item.attempts + 1 }) .eq('id', item.id) .eq('status', 'pendente') if (lockErr) { results.push({ id: item.id, status: 'skip', error: 'lock failed' }) continue } try { // 3. Busca credenciais Evolution API (com cache por owner) let credentials = credentialsCache.get(item.owner_id) if (credentials === undefined) { const { data: channel } = await tdb .from('notification_channels') .select('credentials') .eq('owner_id', item.owner_id) .eq('channel', 'whatsapp') .eq('is_active', true) .is('deleted_at', null) .maybeSingle() credentials = (channel?.credentials as EvolutionCredentials | null) ?? null credentialsCache.set(item.owner_id, credentials) } if (!credentials) { throw new Error('Canal WhatsApp não encontrado ou inativo para este owner') } // 4. Resolve template: todos os templates do schema pertencem ao tenant. // Preferimos o do owner; se não houver, caímos no default do schema. let templateBody: string | null = null const { data: ownerTpl } = await tdb .from('notification_templates') .select('body_text, is_active') .eq('owner_id', item.owner_id) .eq('key', item.template_key) .eq('channel', 'whatsapp') .eq('is_active', true) .is('deleted_at', null) .maybeSingle() if (ownerTpl) { templateBody = ownerTpl.body_text } else { const { data: defaultTpl } = await tdb .from('notification_templates') .select('body_text') .eq('key', item.template_key) .eq('channel', 'whatsapp') .eq('is_default', true) .eq('is_active', true) .is('deleted_at', null) .maybeSingle() templateBody = defaultTpl?.body_text || null } if (!templateBody) { throw new Error(`Template WhatsApp não encontrado: ${item.template_key}`) } // 5. Renderiza variáveis const vars = item.resolved_vars || {} const message = renderTemplate(templateBody, vars) // 6. Envia via Evolution API const sendResult = await sendWhatsapp(credentials, item.recipient_address, message) // 7. Sucesso await tdb .from('notification_queue') .update({ status: 'enviado', sent_at: new Date().toISOString(), provider_message_id: sendResult.messageId || null, }) .eq('id', item.id) await tdb.from('notification_logs').insert({ owner_id: item.owner_id, queue_id: item.id, agenda_evento_id: item.agenda_evento_id, patient_id: item.patient_id, channel: 'whatsapp', template_key: item.template_key, schedule_key: item.schedule_key, recipient_address: item.recipient_address, resolved_message: message, resolved_vars: vars, status: 'sent', provider: 'evolution_api', provider_message_id: sendResult.messageId || null, sent_at: new Date().toISOString(), }) results.push({ id: item.id, status: 'enviado' }) } catch (err) { // 8. Erro — retry com backoff const attempts = item.attempts + 1 const maxAttempts = item.max_attempts || 5 const isExhausted = attempts >= maxAttempts const retryMs = attempts * 2 * 60 * 1000 await tdb .from('notification_queue') .update({ status: isExhausted ? 'falhou' : 'pendente', last_error: err.message, next_retry_at: isExhausted ? null : new Date(Date.now() + retryMs).toISOString(), }) .eq('id', item.id) await tdb.from('notification_logs').insert({ owner_id: item.owner_id, queue_id: item.id, agenda_evento_id: item.agenda_evento_id, patient_id: item.patient_id, channel: 'whatsapp', template_key: item.template_key, schedule_key: item.schedule_key, recipient_address: item.recipient_address, status: 'failed', failure_reason: err.message, failed_at: new Date().toISOString(), }) results.push({ id: item.id, status: isExhausted ? 'falhou' : 'retry', error: err.message }) } } return results } // ── Main handler ─────────────────────────────────────────────── Deno.serve(async (req: Request) => { if (req.method === 'OPTIONS') { return new Response('ok', { headers: corsHeaders }) } const admin = adminClient() // Modo single-tenant se body.tenant_id; senão varre todos. let bodyTenantId: string | null = null try { const body = await req.json() bodyTenantId = body?.tenant_id ?? null } catch { // sem body / body inválido → modo varredura } const results: Result[] = [] const errors: Array<{ tenantId: string; error: string }> = [] try { if (bodyTenantId) { const schema = await schemaForTenant(admin, bodyTenantId) if (!schema) { return new Response( JSON.stringify({ error: `schema indisponível para tenant ${bodyTenantId}` }), { status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ) } const tdb = await tenantDbForId(admin, bodyTenantId) results.push(...await processTenantQueue(tdb)) } else { const tenants = await listTenantSchemas(admin) for (const t of tenants) { try { const tdb = admin.schema(t.schema) results.push(...await processTenantQueue(tdb)) } catch (e) { console.error(`[process-notification-queue] tenant ${t.tenantId} falhou:`, e.message) errors.push({ tenantId: t.tenantId, error: e.message }) } } } } catch (err) { return new Response( JSON.stringify({ error: err.message }), { status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ) } if (!results.length && !errors.length) { return new Response( JSON.stringify({ message: 'Nenhum WhatsApp na fila', processed: 0 }), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ) } const sent = results.filter(r => r.status === 'enviado').length const failed = results.filter(r => r.status === 'falhou').length const retried = results.filter(r => r.status === 'retry').length return new Response( JSON.stringify({ processed: results.length, sent, failed, retried, details: results, tenantErrors: errors }), { status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ) })