F4 schema-per-tenant: edge functions roteiam pro schema do tenant

- _shared/tenant.ts: helper (adminClient, tenantDbForId, schemaForTenant,
  listTenantSchemas, resolveTenantByChannel, tenantSchemaName)
- _shared/whatsapp-hooks.ts: hooks de tabela tenant recebem tdb; RPCs de
  credito (deduct/add_whatsapp_credits) e tenant_members seguem em supa+p_tenant_id
- inbound (twilio/evolution): tenant_id da URL -> tdb pra conversation_messages
  e notification_channels
- crons de fila (process-notification/email/sms/whatsapp-queue): varrem
  listTenantSchemas e drenam a fila de cada schema (Q3: filas sao per-tenant);
  modo single-tenant se body.tenant_id vier
- crons reminders/checks (send-session-reminders, conversation-sla-check,
  whatsapp-heartbeat-check, convert-abandoned-intakes, sync-email-templates):
  loop por tenant
- routing por tenant_id (send-whatsapp-message, send-session-reminder-manual,
  twilio-provision, de/reactivate-channel, twilio-webhook): tenantDbForId;
  channel-actions sem tenant_id varrem schemas por channel_id
- asaas-*: tenant_id do body -> tdb; asaas-webhook fica global (whatsapp_credit_purchases)
- notification-webhook (Meta): resolve tenant via channel_routing por phone_number_id,
  fan-out por message_id quando nao resolve
- caller send-session-reminder-manual passa tenant_id (evento vive no schema)

Pendente: save-intake-progress e fluxos anon por token (decisao de roteamento)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Leonardo
2026-06-13 08:44:09 -03:00
parent ba8348d4a6
commit 9b21642e15
27 changed files with 1291 additions and 835 deletions
+92 -46
View File
@@ -4,20 +4,26 @@
|--------------------------------------------------------------------------
| Processa a notification_queue para channel = 'email'.
|
| schema-per-tenant: notification_queue/channels/logs e email_templates_tenant
| vivem no schema físico de cada tenant (SEM coluna tenant_id). O cron VARRE
| todos os tenants; se vier body.tenant_id, processa só aquele (modo single).
| email_templates_global é GLOBAL (admin/public).
|
| Fluxo por item:
| 1. Busca pendentes (channel='email', status='pendente', scheduled_at <= now)
| 2. Marca como 'processando' (lock otimista)
| 3. Busca canal SMTP em notification_channels
| 3. Busca canal SMTP em notification_channels (tdb)
| 4. Resolve template: COALESCE(tenant, global)
| 5. Renderiza variáveis e condicionais {{#if}}
| 6. Envia via SMTP (Deno raw TCP com STARTTLS)
| 7. Atualiza queue e insere em notification_logs
| 7. Atualiza queue e insere em notification_logs (tdb)
| 8. Em erro: retry com backoff ou marca 'falhou'
|--------------------------------------------------------------------------
*/
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'
import type { SupabaseClient } from 'https://esm.sh/@supabase/supabase-js@2'
import { SmtpClient } from 'https://deno.land/x/smtp@v0.7.0/mod.ts'
import { adminClient, listTenantSchemas, tenantDbForId, schemaForTenant } from '../_shared/tenant.ts'
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
@@ -112,22 +118,15 @@ async function sendEmail(
return { messageId: `${Date.now()}-${Math.random().toString(36).slice(2, 8)}` }
}
// ── Main handler ───────────────────────────────────────────────
// ── Processa a fila de UM tenant ───────────────────────────────
Deno.serve(async (req: Request) => {
if (req.method === 'OPTIONS') {
return new Response('ok', { headers: corsHeaders })
}
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
)
type Result = { id: string; status: string; error?: string }
async function processTenantQueue(admin: SupabaseClient, tdb: SupabaseClient): Promise<Result[]> {
const now = new Date().toISOString()
// 1. Busca itens pendentes de email
const { data: items, error: fetchErr } = await supabase
// 1. Busca itens pendentes de email deste tenant
const { data: items, error: fetchErr } = await tdb
.from('notification_queue')
.select('*')
.eq('channel', 'email')
@@ -137,25 +136,14 @@ Deno.serve(async (req: Request) => {
.order('scheduled_at', { ascending: true })
.limit(20)
if (fetchErr) {
return new Response(
JSON.stringify({ error: fetchErr.message }),
{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
if (fetchErr) throw new Error(fetchErr.message)
if (!items || items.length === 0) return []
if (!items || items.length === 0) {
return new Response(
JSON.stringify({ message: 'Nenhum email na fila', processed: 0 }),
{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
const results: Array<{ id: string; status: string; error?: string }> = []
const results: Result[] = []
for (const item of items) {
// 2. Lock otimista — marca como processando
const { error: lockErr } = await supabase
const { error: lockErr } = await tdb
.from('notification_queue')
.update({ status: 'processando', attempts: item.attempts + 1 })
.eq('id', item.id)
@@ -167,8 +155,8 @@ Deno.serve(async (req: Request) => {
}
try {
// 3. Busca canal SMTP
const { data: channel, error: chErr } = await supabase
// 3. Busca canal SMTP (tdb)
const { data: channel, error: chErr } = await tdb
.from('notification_channels')
.select('credentials, sender_address, provider')
.eq('owner_id', item.owner_id)
@@ -181,24 +169,22 @@ Deno.serve(async (req: Request) => {
throw new Error('Canal SMTP não encontrado ou inativo para este owner')
}
// 4. Resolve template: tenant → global fallback
const { data: tenantTpl } = await supabase
// 4. Resolve template: tenant (tdb) → global (admin) fallback
const { data: tenantTpl } = await tdb
.from('email_templates_tenant')
.select('subject, body_html, body_text, enabled')
.eq('tenant_id', item.tenant_id)
.eq('owner_id', item.owner_id)
.eq('template_key', item.template_key)
.maybeSingle()
// Se tenant desabilitou o template → ignorar
if (tenantTpl && tenantTpl.enabled === false) {
await supabase
await tdb
.from('notification_queue')
.update({ status: 'ignorado' })
.eq('id', item.id)
await supabase.from('notification_logs').insert({
tenant_id: item.tenant_id,
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
@@ -216,8 +202,8 @@ Deno.serve(async (req: Request) => {
continue
}
// Busca global
const { data: globalTpl } = await supabase
// Busca global (admin/public)
const { data: globalTpl } = await admin
.from('email_templates_global')
.select('subject, body_html, body_text')
.eq('key', item.template_key)
@@ -256,7 +242,7 @@ Deno.serve(async (req: Request) => {
)
// 7. Sucesso — atualiza queue
await supabase
await tdb
.from('notification_queue')
.update({
status: 'enviado',
@@ -266,8 +252,7 @@ Deno.serve(async (req: Request) => {
.eq('id', item.id)
// Insere log de sucesso
await supabase.from('notification_logs').insert({
tenant_id: item.tenant_id,
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
@@ -297,7 +282,7 @@ Deno.serve(async (req: Request) => {
? null
: new Date(Date.now() + retryDelay).toISOString()
await supabase
await tdb
.from('notification_queue')
.update({
status: isExhausted ? 'falhou' : 'pendente',
@@ -307,8 +292,7 @@ Deno.serve(async (req: Request) => {
.eq('id', item.id)
// Log de falha
await supabase.from('notification_logs').insert({
tenant_id: item.tenant_id,
await tdb.from('notification_logs').insert({
owner_id: item.owner_id,
queue_id: item.id,
agenda_evento_id: item.agenda_evento_id,
@@ -326,6 +310,67 @@ Deno.serve(async (req: Request) => {
}
}
return results
}
// ── Main handler ───────────────────────────────────────────────
Deno.serve(async (req: Request) => {
if (req.method === 'OPTIONS') {
return new Response('ok', { headers: corsHeaders })
}
const admin = adminClient()
// Modo single-tenant se body.tenant_id; senão varre todos.
let bodyTenantId: string | null = null
try {
const body = await req.json()
bodyTenantId = body?.tenant_id ?? null
} catch {
// sem body / body inválido → modo varredura
}
const results: Result[] = []
const errors: Array<{ tenantId: string; error: string }> = []
try {
if (bodyTenantId) {
const schema = await schemaForTenant(admin, bodyTenantId)
if (!schema) {
return new Response(
JSON.stringify({ error: `schema indisponível para tenant ${bodyTenantId}` }),
{ status: 404, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
const tdb = await tenantDbForId(admin, bodyTenantId)
results.push(...await processTenantQueue(admin, tdb))
} else {
const tenants = await listTenantSchemas(admin)
for (const t of tenants) {
try {
const tdb = admin.schema(t.schema)
results.push(...await processTenantQueue(admin, tdb))
} catch (e) {
console.error(`[process-email-queue] tenant ${t.tenantId} falhou:`, e.message)
errors.push({ tenantId: t.tenantId, error: e.message })
}
}
}
} catch (err) {
return new Response(
JSON.stringify({ error: err.message }),
{ status: 500, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
if (!results.length && !errors.length) {
return new Response(
JSON.stringify({ message: 'Nenhum email na fila', processed: 0 }),
{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)
}
const sent = results.filter(r => r.status === 'enviado').length
const failed = results.filter(r => r.status === 'falhou').length
const retried = results.filter(r => r.status === 'retry').length
@@ -339,6 +384,7 @@ Deno.serve(async (req: Request) => {
retried,
ignored,
details: results,
tenantErrors: errors,
}),
{ status: 200, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }
)