Files
agenciapsilmno/supabase/functions/process-sms-queue/index.ts
T
Leonardo 9b21642e15 F4 schema-per-tenant: edge functions roteiam pro schema do tenant
- _shared/tenant.ts: helper (adminClient, tenantDbForId, schemaForTenant,
  listTenantSchemas, resolveTenantByChannel, tenantSchemaName)
- _shared/whatsapp-hooks.ts: hooks de tabela tenant recebem tdb; RPCs de
  credito (deduct/add_whatsapp_credits) e tenant_members seguem em supa+p_tenant_id
- inbound (twilio/evolution): tenant_id da URL -> tdb pra conversation_messages
  e notification_channels
- crons de fila (process-notification/email/sms/whatsapp-queue): varrem
  listTenantSchemas e drenam a fila de cada schema (Q3: filas sao per-tenant);
  modo single-tenant se body.tenant_id vier
- crons reminders/checks (send-session-reminders, conversation-sla-check,
  whatsapp-heartbeat-check, convert-abandoned-intakes, sync-email-templates):
  loop por tenant
- routing por tenant_id (send-whatsapp-message, send-session-reminder-manual,
  twilio-provision, de/reactivate-channel, twilio-webhook): tenantDbForId;
  channel-actions sem tenant_id varrem schemas por channel_id
- asaas-*: tenant_id do body -> tdb; asaas-webhook fica global (whatsapp_credit_purchases)
- notification-webhook (Meta): resolve tenant via channel_routing por phone_number_id,
  fan-out por message_id quando nao resolve
- caller send-session-reminder-manual passa tenant_id (evento vive no schema)

Pendente: save-intake-progress e fluxos anon por token (decisao de roteamento)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 08:44:09 -03:00

369 lines
13 KiB
TypeScript

/*
|--------------------------------------------------------------------------
| Agência PSI — Edge Function: process-sms-queue
|--------------------------------------------------------------------------
| Processa a notification_queue para channel = 'sms' via Twilio.
|
| Modelo: Créditos da plataforma
| - Credenciais Twilio são da plataforma (env vars)
| - Antes de enviar, debita 1 crédito do tenant via RPC
| - Sem crédito → marca como 'sem_credito'
|
| schema-per-tenant: notification_queue/templates/logs vivem no schema físico
| de cada tenant (SEM coluna tenant_id). O cron VARRE todos os tenants; se vier
| body.tenant_id, processa só aquele (modo single). addon_credits é GLOBAL
| (admin) e a RPC debit_addon_credit continua em admin.rpc com p_tenant_id.
|
| Fluxo:
| 1. Busca pendentes (channel='sms', status='pendente', scheduled_at <= now)
| 2. Lock otimista (status → processando)
| 3. Debita crédito SMS do tenant (RPC, admin + p_tenant_id)
| 4. Resolve template (templates do schema pertencem ao tenant)
| 5. Renderiza variáveis {{var}}
| 6. Envia via Twilio REST API (credenciais da plataforma)
| 7. Atualiza queue + insere notification_logs (tdb)
| 8. Retry com backoff em caso de erro
|--------------------------------------------------------------------------
*/
import type { SupabaseClient } from 'https://esm.sh/@supabase/supabase-js@2'
import { adminClient, listTenantSchemas, tenantDbForId, schemaForTenant } from '../_shared/tenant.ts'
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
'Access-Control-Allow-Methods': 'POST, OPTIONS',
}
// ── Template renderer ──────────────────────────────────────────
function renderTemplate(template: string, variables: Record<string, unknown>): string {
if (!template) return ''
return template.replace(/\{\{([\w.]+)\}\}/g, (_, key) => {
const val = variables[key]
return val !== undefined && val !== null ? String(val) : ''
})
}
// ── Twilio SMS sender (credenciais da plataforma) ──────────────
async function sendViaTwilio(
to: string,
body: string,
fromOverride?: string | null
): Promise<{ sid: string; status: string }> {
const accountSid = Deno.env.get('TWILIO_ACCOUNT_SID')!
const authToken = Deno.env.get('TWILIO_AUTH_TOKEN')!
const fromNumber = fromOverride || Deno.env.get('TWILIO_FROM_NUMBER')!
const url = `https://api.twilio.com/2010-04-01/Accounts/${accountSid}/Messages.json`
const auth = btoa(`${accountSid}:${authToken}`)
const params = new URLSearchParams()
params.set('From', fromNumber)
params.set('To', to)
params.set('Body', body)
const res = await fetch(url, {
method: 'POST',
headers: {
'Authorization': `Basic ${auth}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: params.toString(),
})
const data = await res.json()
if (!res.ok) {
throw new Error(data.message || `Twilio error ${res.status}: ${data.code}`)
}
return { sid: data.sid, status: data.status }
}
// ── Mock sender (dev/test) ─────────────────────────────────────
function isMockMode(): boolean {
const sid = Deno.env.get('TWILIO_ACCOUNT_SID') || ''
return sid.startsWith('AC_TEST') || Deno.env.get('DEV') === 'true'
}
function mockSend(_to: string, _body: string): { sid: string; status: string } {
const sid = `mock_${crypto.randomUUID().slice(0, 8)}`
console.log(`[SMS MOCK] To: ${_to} | Body: ${_body} | SID: ${sid}`)
return { sid, status: 'sent' }
}
// ── Processa a fila de UM tenant ───────────────────────────────
type Result = { id: string; status: string; error?: string }
async function processTenantQueue(
admin: SupabaseClient,
tdb: SupabaseClient,
tenantId: string
): Promise<Result[]> {
const now = new Date().toISOString()
// 1. Busca itens pendentes deste tenant
const { data: items, error: fetchErr } = await tdb
.from('notification_queue')
.select('*')
.eq('channel', 'sms')
.eq('status', 'pendente')
.lte('scheduled_at', now)
.order('scheduled_at', { ascending: true })
.limit(20)
if (fetchErr) throw new Error(fetchErr.message)
if (!items?.length) return []
const results: Result[] = []
for (const item of items) {
// Filtra por max_attempts
if (item.attempts >= (item.max_attempts || 5)) continue
// 2. Lock otimista
const { error: lockErr } = await tdb
.from('notification_queue')
.update({ status: 'processando', attempts: item.attempts + 1 })
.eq('id', item.id)
.eq('status', 'pendente')
if (lockErr) {
results.push({ id: item.id, status: 'skip', error: 'lock failed' })
continue
}
try {
// 3. Debita crédito SMS do tenant (RPC global, p_tenant_id da iteração)
const { data: debitResult, error: debitErr } = await admin
.rpc('debit_addon_credit', {
p_tenant_id: tenantId,
p_addon_type: 'sms',
p_queue_id: item.id,
p_description: `SMS para ${item.recipient_address}`,
})
if (debitErr) {
throw new Error(`Erro ao debitar crédito: ${debitErr.message}`)
}
if (!debitResult?.success) {
// Sem crédito — não envia, marca como sem_credito
await tdb
.from('notification_queue')
.update({ status: 'sem_credito', last_error: debitResult?.reason || 'Sem créditos SMS' })
.eq('id', item.id)
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
patient_id: item.patient_id,
channel: 'sms',
template_key: item.template_key,
schedule_key: item.schedule_key,
recipient_address: item.recipient_address,
status: 'failed',
failure_reason: `Sem créditos SMS: ${debitResult?.reason || 'balance=0'}`,
failed_at: new Date().toISOString(),
})
results.push({ id: item.id, status: 'sem_credito', error: debitResult?.reason })
continue
}
// 4. Resolve template: templates do schema pertencem ao tenant.
// Preferimos override do owner; senão default do schema.
let template: { body_text: string } | null = null
const { data: ownerTpl } = await tdb
.from('notification_templates')
.select('body_text, is_active')
.eq('owner_id', item.owner_id)
.eq('key', item.template_key)
.eq('channel', 'sms')
.eq('is_active', true)
.is('deleted_at', null)
.maybeSingle()
if (ownerTpl) {
template = ownerTpl
} else {
const { data: defaultTpl } = await tdb
.from('notification_templates')
.select('body_text')
.eq('key', item.template_key)
.eq('channel', 'sms')
.eq('is_default', true)
.eq('is_active', true)
.is('deleted_at', null)
.maybeSingle()
template = defaultTpl
}
if (!template) {
throw new Error(`Template SMS não encontrado: ${item.template_key}`)
}
// 5. Renderiza variáveis
const vars = item.resolved_vars || {}
const message = renderTemplate(template.body_text, vars)
// 6. Busca from_number override do tenant (addon_credits é GLOBAL → admin)
const { data: creditRow } = await admin
.from('addon_credits')
.select('from_number_override')
.eq('tenant_id', tenantId)
.eq('addon_type', 'sms')
.maybeSingle()
const fromOverride = creditRow?.from_number_override || null
// 7. Envia via Twilio (ou mock)
let sendResult: { sid: string; status: string }
if (isMockMode()) {
sendResult = mockSend(item.recipient_address, message)
} else {
sendResult = await sendViaTwilio(item.recipient_address, message, fromOverride)
}
// 8. Sucesso
await tdb
.from('notification_queue')
.update({
status: 'enviado',
sent_at: new Date().toISOString(),
provider_message_id: sendResult.sid,
})
.eq('id', item.id)
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
patient_id: item.patient_id,
channel: 'sms',
template_key: item.template_key,
schedule_key: item.schedule_key,
recipient_address: item.recipient_address,
resolved_message: message,
resolved_vars: vars,
status: 'sent',
provider: 'twilio',
provider_message_id: sendResult.sid,
sent_at: new Date().toISOString(),
})
results.push({ id: item.id, status: 'enviado' })
} catch (err) {
// 9. Erro — retry com backoff
const attempts = item.attempts + 1
const maxAttempts = item.max_attempts || 5
const isExhausted = attempts >= maxAttempts
const retryMs = attempts * 2 * 60 * 1000
await tdb
.from('notification_queue')
.update({
status: isExhausted ? 'falhou' : 'pendente',
last_error: err.message,
next_retry_at: isExhausted ? null : new Date(Date.now() + retryMs).toISOString(),
})
.eq('id', item.id)
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
patient_id: item.patient_id,
channel: 'sms',
template_key: item.template_key,
schedule_key: item.schedule_key,
recipient_address: item.recipient_address,
status: 'failed',
failure_reason: err.message,
failed_at: new Date().toISOString(),
})
results.push({ id: item.id, status: isExhausted ? 'falhou' : 'retry', error: err.message })
}
}
return results
}
// ── Main handler ───────────────────────────────────────────────
Deno.serve(async (req: Request) => {
if (req.method === 'OPTIONS') {
return new Response('ok', { headers: corsHeaders })
}
const admin = adminClient()
// Modo single-tenant se body.tenant_id; senão varre todos.
let bodyTenantId: string | null = null
try {
const body = await req.json()
bodyTenantId = body?.tenant_id ?? null
} catch {
// sem body / body inválido → modo varredura
}
const results: Result[] = []
const errors: Array<{ tenantId: string; error: string }> = []
try {
if (bodyTenantId) {
const schema = await schemaForTenant(admin, bodyTenantId)
if (!schema) {
return new Response(
JSON.stringify({ error: `schema indisponível para tenant ${bodyTenantId}` }),
{ status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
const tdb = await tenantDbForId(admin, bodyTenantId)
results.push(...await processTenantQueue(admin, tdb, bodyTenantId))
} else {
const tenants = await listTenantSchemas(admin)
for (const t of tenants) {
try {
const tdb = admin.schema(t.schema)
results.push(...await processTenantQueue(admin, tdb, t.tenantId))
} catch (e) {
console.error(`[process-sms-queue] tenant ${t.tenantId} falhou:`, e.message)
errors.push({ tenantId: t.tenantId, error: e.message })
}
}
}
} catch (err) {
return new Response(
JSON.stringify({ error: err.message }),
{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
if (!results.length && !errors.length) {
return new Response(
JSON.stringify({ message: 'Nenhum SMS na fila', processed: 0 }),
{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
const sent = results.filter(r => r.status === 'enviado').length
const failed = results.filter(r => r.status === 'falhou').length
const noCredit = results.filter(r => r.status === 'sem_credito').length
return new Response(
JSON.stringify({ processed: results.length, sent, failed, no_credit: noCredit, details: results, tenantErrors: errors }),
{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
})