Skip to content

Arquitectura de Componentes

< Volver al indice | Siguiente: Base de Datos >


Documentacion retrospectiva - Actualizada a partir de codigo implementado el 2026-02-24. Fuente: Core/ValueObjects/, Core/Controllers/, Core/Services/, Routes/Jobs/.

Tabla de Contenidos


Value Objects

BackgroundJob

Ubicación: Core/ValueObjects/BackgroundJob.php

Namespace: App\Core\ValueObjects

Propósito: Representar un job en el sistema con todos sus metadatos

Propiedades (constructor con property promotion):

PropiedadTipoDescripcion
typestring (readonly)Tipo de job (ej: 'batch_invoicing')
userIdint (readonly)ID del usuario que creo el job
dbstring (readonly)Base de datos de la empresa (ej: empresa_xyz, empresa_xyz_p para modo prueba)
schemastring (readonly)Schema PostgreSQL donde ejecutar (CRITICO para multi-tenant)
nroSistemaint (readonly)Identificador del sistema desde JWT (Payload->sistema)
payloadarray (readonly)Datos necesarios para ejecutar el job
pruebabool (readonly)Flag de modo prueba (true cuando la conexion usa BD con sufijo _p)
idint|nullID unico del job (null si no persistido)
statusstringEstado: pending | running | completed | failed
resultarray|nullResultado del job (null si no termino)
errorstring|nullMensaje de error (null si no fallo)
createdAtstring|nullTimestamp de creacion
startedAtstring|nullTimestamp de inicio de ejecucion
completedAtstring|nullTimestamp de finalizacion
progressfloatProgreso de 0 a 100 (actualizado por el handler durante ejecucion)
retryCountintNumero de reintentos realizados (0 = primera ejecucion)
maxRetriesintMaximo de reintentos (default: 3)
nextRetryAtstring|nullTimestamp de cuando debe reintentarse (null = dispatch inmediato)

Constantes de estado:

  • STATUS_PENDING = 'pending'
  • STATUS_RUNNING = 'running'
  • STATUS_COMPLETED = 'completed'
  • STATUS_FAILED = 'failed'

Metodos:

  • isPending(): bool - Verifica si esta pendiente
  • isRunning(): bool - Verifica si esta ejecutandose
  • isCompleted(): bool - Verifica si termino exitosamente
  • isFailed(): bool - Verifica si fallo
  • getExecutionTime(): int|null - Duracion en segundos (entre startedAt y completedAt)
  • static fromRow(array $row): self - Construye un BackgroundJob desde una fila de BD (mapea nro_sistema, prueba, retry_count, etc.)

Notification

Ubicación: Core/ValueObjects/Notification.php

Namespace: App\Core\ValueObjects

Propósito: Representar notificación al usuario sobre resultado de job

Propiedades:

PropiedadTipoDescripción
idintID único de notificación
user_idintUsuario destinatario
typestringTipo: success | error | info
titlestringTítulo breve
messagestringMensaje detallado
metadataarrayDatos adicionales (ej: job_id, result)
is_readboolSi fue leída
created_atstringTimestamp de creación
read_atstring|nullTimestamp de lectura

Métodos:

  • markAsRead(): void - Marca como leída
  • isRead(): bool - Verifica si fue leída
  • getJobId(): int|null - Extrae job_id de metadata

Repositorios (Data Access Layer)

JobRepository

Ubicación: Core/Repositories/JobRepository.php

Namespace: App\Core\Repositories

Responsabilidades:

  • CRUD de jobs en tabla background_jobs
  • Queries específicas: jobs por usuario, jobs por estado
  • NO lógica de negocio (solo persistencia)

Métodos Principales:

create(BackgroundJob $job): int

Descripción: Insertar nuevo job en BD

SQL:

sql
INSERT INTO background_jobs (type, status, payload, user_id, db, schema, nro_sistema, prueba, created_at)
VALUES (:type, :status, :payload, :user_id, :db, :schema, :nro_sistema, :prueba, NOW())
RETURNING id

Retorna: ID del job creado


findById(int $id): BackgroundJob|null

Descripción: Obtener job por ID

SQL:

sql
SELECT * FROM background_jobs
WHERE id = :id
LIMIT 1

Retorna: BackgroundJob o null si no existe


update(BackgroundJob $job): void

Descripción: Actualizar job existente (estado, resultado, error)

SQL:

sql
UPDATE background_jobs SET
    status = :status,
    result = :result,
    error = :error,
    started_at = :started_at,
    completed_at = :completed_at
WHERE id = :id

countPendingByUser(int $userId): int

Descripción: Contar jobs pendientes de un usuario (DOS protection)

SQL:

sql
SELECT COUNT(*) FROM background_jobs
WHERE user_id = :user_id
  AND status = 'pending'

Retorna: Cantidad de jobs pendientes


findActiveByDbAndRootSchema(string $db, string $rootSchema, string $type): ?BackgroundJob

Descripcion: Buscar un job activo (pending o running) en el scope (type, db, root_schema), independientemente de user_id.

Parametros:

  • $db: Base de datos de la empresa (ej: bautista_1, bautista_1_p para modo prueba)
  • $rootSchema: Schema raiz derivado del schema del usuario (ej: suc0001 si el usuario esta en suc0001caja0001)
  • $type: Tipo de job

Patron de schema SQL generado por el metodo:

  • rootSchema = 'public':pattern = '^public$'
  • rootSchema = 'suc0001':pattern = '^suc0001(caja[0-9]{4})?$'

El patron cubre tanto el schema raiz como sus sub-schemas de caja: un job en suc0001caja0001 bloquea a un usuario en suc0001caja0002 porque ambos comparten el mismo rootSchema = 'suc0001'.

SQL:

sql
SELECT * FROM background_jobs
WHERE type   = :type
  AND db     = :db
  AND schema ~ :pattern
  AND status IN ('pending', 'running')
ORDER BY created_at DESC
LIMIT 1

Retorna: BackgroundJob o null si no hay ninguno activo en el scope.

Conexion: getDbal('ini') (consistente con todos los metodos de JobRepository)

Nota de seguridad: El valor de :pattern se pasa siempre como bind parametrizado. $rootSchema es derivado del campo schema validado por el CHECK constraint de la BD — nunca es input crudo del usuario.

Usado por: JobDispatcher::dispatch() para la guardia de deduplicacion scope-aware.


findStaleJobs(int $minutesThreshold): array

Descripción: Encontrar jobs "stale" (running > X minutos, probablemente crashed)

SQL:

sql
SELECT * FROM background_jobs
WHERE status = 'running'
  AND started_at < NOW() - INTERVAL ':minutes minutes'

Retorna: Array de BackgroundJob


NotificationRepository

Ubicación: Core/Repositories/NotificationRepository.php

Namespace: App\Core\Repositories

Responsabilidades:

  • CRUD de notificaciones en tabla notifications
  • Queries: notificaciones por usuario, no leídas

Métodos Principales:

create(Notification $notification): int

Descripción: Insertar nueva notificación

SQL:

sql
INSERT INTO notifications (user_id, type, title, message, metadata, is_read, created_at)
VALUES (:user_id, :type, :title, :message, :metadata, FALSE, NOW())
RETURNING id

Retorna: ID de notificación creada


findUnreadByUser(int $userId): array

Descripción: Obtener notificaciones no leídas de un usuario

SQL:

sql
SELECT * FROM notifications
WHERE user_id = :user_id
  AND is_read = FALSE
ORDER BY created_at DESC

Retorna: Array de Notification


markAsRead(int $id): void

Descripción: Marcar notificación como leída

SQL:

sql
UPDATE notifications SET
    is_read = TRUE,
    read_at = NOW()
WHERE id = :id

Servicios (Business Logic Layer)

JobDispatcher

Ubicación: Core/Services/JobDispatcher.php

Namespace: App\Core\Services

Responsabilidades:

  • Validar límites de jobs pendientes (DOS protection)
  • Crear job en BD
  • Lanzar worker en background con exec()
  • Retornar job ID al controller

Dependencias:

  • JobRepository - Para persistir jobs
  • ConnectionManager - Para acceso a DB
  • Configuración: MAX_PENDING_JOBS_PER_USER (default: 10)

Métodos Públicos:

dispatch(string $type, array $payload, int $userId, string $schema, string $db, int $nroSistema, bool $prueba): int

Descripcion: Despachar nuevo job para ejecucion asincrona

Parametros:

  • $type: Tipo de job (debe tener handler registrado)
  • $payload: Datos necesarios para ejecutar el job
  • $userId: Usuario que crea el job
  • $schema: Schema PostgreSQL para ejecucion (CRITICO multi-tenant)
  • $db: Base de datos de la empresa (con sufijo _p si es modo prueba)
  • $nroSistema: Identificador del sistema desde JWT
  • $prueba: Flag de modo prueba

Retorna: ID del job creado

Excepciones:

  • InvalidJobTypeException: Si tipo de job no tiene handler registrado → HTTP 422
  • JobAlreadyActiveForScopeException: Si ya existe un job activo para el mismo scope (type, db, root_schema) → HTTP 409
  • TooManyJobsException: Si usuario excede limite de jobs pendientes → HTTP 429

Flujo:

  1. Validar que tipo de job tenga handler registrado (JobExecutor::hasHandler())
  2. Extraer $rootSchema del schema del usuario: extractRootSchema($schema) (ej: suc0001caja0001suc0001)
  3. Verificar scope: findActiveByDbAndRootSchema($db, $rootSchema, $type) — si retorna un job, lanzar JobAlreadyActiveForScopeException
  4. Verificar limite DOS: countPendingByUser($userId) < MAX_PENDING_JOBS_PER_USER
  5. Crear BackgroundJob con status='pending', incluyendo nroSistema y prueba
  6. Persistir en BD (JobRepository::create)
  7. Lanzar worker: exec("php cli/background-worker.php {$jobId} {$schema} {$db} >> logs/background-jobs.log 2>&1 &")
  8. Retornar job ID

Nota Critica: El exec() con & al final NO espera al proceso hijo (non-blocking)

Diagrama ASCII de la guardia de dedup:

JobDispatcher::dispatch()

  ├─ 1. hasHandler(type)?                     → NO  → throw InvalidJobTypeException (422)

  ├─ 2. extractRootSchema($schema)            → $rootSchema

  ├─ 3. findActiveByDbAndRootSchema()         → found → throw JobAlreadyActiveForScopeException (409)
  │                                           → null  → continuar

  ├─ 4. countPendingByUser()                  → >= limit → throw TooManyJobsException (429)

  ├─ 5. JobRepository::create()              → $jobId

  └─ 6. exec(worker.php $jobId $schema $db)  → non-blocking


        return $jobId

Metodo privado extractRootSchema(string $schema): string:

Deriva el root schema de un schema PostgreSQL completo:

Schema de entradaroot_schema
suc0001suc0001
suc0001caja0001suc0001
suc0001caja0002suc0001
publicpublic

Implementacion: delega a ConnectionUtils::extractSucursalSchema($schema) — método estático del trait App\connection\ConnectionUtils que encapsula la extracción del schema raíz.


JobExecutor

Ubicación: Core/Services/JobExecutor.php

Namespace: App\Core\Services

Responsabilidades:

  • Registrar handlers disponibles (Strategy Pattern)
  • Ejecutar job con el handler correspondiente
  • Actualizar estado del job (running → completed/failed)
  • Crear notificación al usuario con resultado

Dependencias:

  • JobRepository - Para actualizar estado
  • NotificationRepository - Para crear notificación
  • ConnectionManager - Para multi-tenant schema setup
  • Array de handlers registrados

Métodos Públicos:

registerHandler(JobHandlerInterface $handler): void

Descripción: Registrar nuevo handler para un tipo de job

Parámetros:

  • $handler: Instancia de handler que implementa JobHandlerInterface

Flujo:

  1. Obtener tipo con $handler->getType()
  2. Registrar en array interno: $this->handlers[$type] = $handler

hasHandler(string $type): bool

Descripción: Verificar si existe handler para un tipo de job

Parámetros:

  • $type: Tipo de job

Retorna: true si existe handler registrado


execute(int $jobId): void

Descripción: Ejecutar job por ID completo (cargar, ejecutar, actualizar, notificar)

Parámetros:

  • $jobId: ID del job a ejecutar

Excepciones:

  • JobNotFoundException: Si job no existe
  • NoHandlerException: Si tipo de job no tiene handler
  • Cualquier exception lanzada por el handler

Flujo:

  1. Cargar job desde BD (JobRepository::findById)
  2. Validar que job esté en status='pending'
  3. Actualizar status='running', started_at=NOW()
  4. Configurar schema: ConnectionManager::setSearchPath($job->schema) (CRÍTICO)
  5. Obtener handler para $job->type
  6. Ejecutar: $result = $handler->handle($job->payload)
  7. Si OK:
    • Actualizar status='completed', result=$result, completed_at=NOW()
    • Crear notificación tipo='success'
  8. Si Exception y retryCount < maxRetries:
    • Incrementar retry_count
    • Calcular next_retry_at con exponential backoff
    • Actualizar status='pending', error=$exception->getMessage()
    • Self-dispatch: lanzar nuevo worker CLI con exec() programado para next_retry_at (el JobExecutor se encarga del re-dispatch, sin depender de un scheduler externo)
  9. Si Exception y retryCount >= maxRetries:
    • Actualizar status='failed', error=$exception->getMessage(), completed_at=NOW()
    • Crear notificación tipo='error'

Nota Crítica: El paso 4 (configurar schema) es CRÍTICO para multi-tenancy. Si se omite, el job ejecutará en el schema incorrecto.

Nota sobre retries: El retry automático es self-contained dentro de JobExecutor. Cada fallo con reintentos disponibles produce un re-dispatch inmediato del worker CLI. El daemon bin/worker.php actúa como safety-net (intervalo de 5 minutos) para reintentar jobs huérfanos cuyo self-dispatch falló.


NotificationService

Ubicación: Core/Services/NotificationService.php

Namespace: App\Core\Services

Responsabilidades:

  • Crear notificaciones de diferentes tipos
  • Consultar notificaciones no leídas
  • Marcar notificaciones como leídas

Dependencias:

  • NotificationRepository - Para persistir notificaciones

Métodos Públicos:

createFromJobResult(BackgroundJob $job): void

Descripción: Crear notificación basada en resultado de job

Parámetros:

  • $job: Job completado o fallido

Flujo:

  1. Determinar tipo según status del job:
    • completed → type='success'
    • failed → type='error'
  2. Generar título y mensaje apropiados
  3. Incluir metadata: ['job_id' => $job->id, 'job_type' => $job->type]
  4. Persistir con NotificationRepository::create

getUnreadByUser(int $nroSistema, int $userId, string $db, string $schema, bool $prueba): array

Descripcion: Obtener notificaciones no leidas de un usuario, filtradas por contexto multi-tenant

Parametros:

  • $nroSistema: Identificador del sistema
  • $userId: ID del usuario
  • $db: Base de datos (con sufijo _p si prueba)
  • $schema: Schema PostgreSQL
  • $prueba: Flag de modo prueba

Retorna: Array de Notification


markAsRead(int $notificationId, int $nroSistema, int $userId, string $db, string $schema, bool $prueba): bool

Descripcion: Marcar notificacion especifica como leida, validando ownership por contexto multi-tenant

Retorna: true si se encontro y marco como leida, false si no se encontro


RetryStrategy

Ubicacion: Core/Services/RetryStrategy.php

Namespace: App\Core\Services

Proposito: Encapsular la logica de reintento — cuanto esperar entre intentos, cuantos intentos maximo, y que excepciones son reintentables.

Dependencias:

  • Variable de entorno BACKGROUND_JOBS_MAX_RETRIES (default: 3)

Responsabilidades:

  • Calculo de delay: Implementa exponential backoff para el next_retry_at del job. El delay aumenta con cada intento (ej: 60s, 120s, 240s).
  • Max attempts: Lee el limite maximo desde BACKGROUND_JOBS_MAX_RETRIES del entorno.
  • Clasificacion de excepciones: Distingue entre excepciones reintentables (errores transitorios: timeout, conexion, API externa) y no reintentables (errores de dominio: validacion, datos incorrectos). Las excepciones no reintentables pasan directamente a failed sin consumir reintentos.

Usado por: JobRetryService para determinar si un job fallido debe reintentarse y cuando.


AlertService

Ubicacion: Core/Services/AlertService.php

Namespace: App\Core\Services

Proposito: Monitorear la tasa de fallos de jobs y enviar alertas webhook cuando supera el umbral configurado.

Dependencias:

  • Variable de entorno JOBS_ALERT_WEBHOOK_URL — URL del webhook receptor (Slack, PagerDuty, etc.)
  • JobRepository — Para consultar metricas de fallos recientes

Comportamiento:

  • Evalua la tasa de fallos en una ventana de tiempo reciente
  • Si la tasa supera el umbral configurado, envia un HTTP POST al webhook en JOBS_ALERT_WEBHOOK_URL
  • Si JOBS_ALERT_WEBHOOK_URL no esta configurado, la alerta se omite silenciosamente (solo logging)

Llamado por: JobRetryService despues de que un job agota todos sus reintentos (transicion a failed definitivo).


HealthChecker

Ubicacion: Core/Services/HealthChecker.php

Namespace: App\Core\Services

Proposito: Verificar el estado de salud del sistema de background jobs

Dependencias:

  • JobRepository - Para obtener metricas y stale jobs
  • ConnectionManager - Para verificar conectividad a BD

Checks realizados:

CheckQue verificaUmbrales
databaseConectividad y latencia (SELECT 1 en conexion principal)up / down
queueCantidad de jobs pendienteshealthy < 1000, degraded 1000-5000, unhealthy > 5000
stale_jobsJobs en running por mas de 60 minutoshealthy = 0, warning > 0

Estado overall: healthy > degraded > unhealthy (se usa el mas severo de todos los checks)


AdminPermissionChecker

Ubicacion: Core/Services/AdminPermissionChecker.php

Namespace: App\Core\Services

Proposito: Verificar si un usuario tiene permisos de administrador consultando rel_permisos_grupos en el schema empresa-level (public) via la conexion oficial.

Estado actual (2026-02-24): No se utiliza activamente. AdminJobController::isAdmin() retorna true para todos los usuarios autenticados, pendiente del semillado de rel_grupos_usuarios.


Handlers (Strategy Pattern)

JobHandlerInterface

Ubicación: Core/Interfaces/JobHandlerInterface.php

Namespace: App\Core\Interfaces

Propósito: Contrato que deben cumplir todos los handlers de jobs

Métodos:

php
interface JobHandlerInterface
{
    /**
     * Obtener tipo de job que maneja este handler
     */
    public function getType(): string;

    /**
     * Ejecutar job con payload dado
     *
     * @param array $payload Datos necesarios para ejecutar
     * @param callable|null $onProgress Callback opcional para reportar progreso (0.0 a 100.0)
     * @return array Resultado del job
     * @throws Exception Si falla la ejecución
     */
    public function handle(array $payload, ?callable $onProgress = null): array;
}

BatchInvoicingJobHandler (Ejemplo)

Ubicación: Ventas/Handlers/BatchInvoicingJobHandler.php

Namespace: App\Ventas\Handlers

Propósito: Handler para facturación masiva (ejemplo de implementación)

Type: 'batch_invoicing'

Payload esperado:

php
[
    'cliente_ids' => [1, 2, 3, 4, 5],
    'fecha' => '2026-02-05',
    'concepto' => 'Facturación mensual',
    'monto_base' => 1000.00
]

Result retornado:

php
[
    'facturas_creadas' => 5,
    'monto_total' => 5000.00,
    'factura_ids' => [101, 102, 103, 104, 105],
    'errores' => [] // Clientes que fallaron
]

Dependencias:

  • FacturaService - Service existente de facturación (NO modificado)

Patrón Wrapper:

El handler NO modifica FacturaService. En su lugar:

  1. Recibe payload con datos consolidados
  2. Itera sobre los items a procesar
  3. Para cada item, reconstruye el request DTO que espera FacturaService::insert()
  4. Delega a FacturaService::insert($dto) (método existente sin cambios)
  5. Acumula resultados y errores
  6. Retorna consolidado

Ventajas del patrón:

  • ✅ CERO impacto en código existente
  • ✅ Service puede usarse sincrónicamente (original) o asincrónicamente (via handler)
  • ✅ Feature flag controlado (rollback instantáneo)
  • ✅ Fácil testing (unit tests del handler con mock del service)

Controllers (HTTP Layer)

JobController

Ubicacion: Core/Controllers/JobController.php

Namespace: App\Core\Controllers

Responsabilidades:

  • Despachar jobs (POST)
  • Consultar estado de job (GET)
  • Listar notificaciones no leidas del usuario
  • Marcar notificaciones como leidas

Dependencias:

  • JobDispatcher - Para despachar jobs
  • JobRepository - Para consultar jobs
  • NotificationService - Para notificaciones
  • Payload - Datos del JWT (user_id, schema, sistema, db, cuit, etc.)
  • ConnectionManager - Para determinar modo prueba

Metodos:

dispatch(ServerRequestInterface $request, ResponseInterface $response, array $args): ResponseInterface

Endpoint: POST /backend/jobs/{type}

Path Params:

  • type: Tipo de job (ej: 'batch_invoicing')

Request Body:

json
{
  "payload": {
    "cliente_ids": [1, 2, 3],
    "fecha": "2026-02-05"
  },
  "schema_level": "sucursal"
}

Response (202 Accepted):

json
{
  "status": "success",
  "data": {
    "status": "accepted",
    "job_id": 123,
    "message": "Job creado, se ejecutará en segundo plano."
  }
}

Codigos de respuesta:

  • 202 Accepted: Job creado exitosamente
  • 409 Conflict: Ya existe un job activo para el mismo scope (type, db, root_schema). Ver JobAlreadyActiveForScopeException.
  • 422 Unprocessable Entity: Tipo de job no existe
  • 429 Too Many Requests: Usuario excede limite de jobs pendientes

Respuesta 409 (job bloqueado por scope):

json
{
  "error": "job_already_active_for_scope",
  "message": "Job type 'batch_invoicing' is already active for scope 'suc0001' in db 'bautista_1'",
  "scope": {
    "type": "batch_invoicing",
    "db": "bautista_1",
    "root_schema": "suc0001"
  }
}

Nota: El job_id del job bloqueante NO se incluye en la respuesta — el usuario que solicita no puede consultarlo (el ownership check retornaria 404).

GET /backend/jobs/active/{type}: Permanece por-usuario (no cambia por este change). Un usuario puede recibir data: null de este endpoint incluso cuando el scope-level lock le impide despachar — esto es comportamiento esperado.

Inyeccion de _context en payload (ver ADR-006):

Antes de despachar el job, JobController copia datos del JWT Payload al payload del job bajo la clave _context. Esto permite al worker CLI reconstruir el contexto de ejecucion sin JWT:

php
$jobPayload['_context'] = [
    'cuit'         => $this->payload->cuit,
    'nro_cliente'  => $this->payload->nro_cliente,
    'id_usuario'   => $this->payload->id_usuario,
    'id'           => $this->payload->id,
    'sistema'      => $this->payload->sistema,
    'schema_level' => $schemaLevel,
    'prueba'       => $prueba,
];

Propagacion multi-tenant: nroSistema se extrae de Payload->sistema, prueba se determina via ConnectionManager::isPruebaConnection(), db se deriva de Payload->db con sufijo _p si es modo prueba.


getStatus(...): ResponseInterface

Endpoint: GET /backend/jobs/{id}

Solo el usuario propietario puede consultarlo (owner check: $job->userId !== $userId retorna 404).


getNotifications(...): ResponseInterface

Endpoint: GET /backend/jobs/notifications

Retorna notificaciones no leidas del usuario actual, filtradas por nroSistema, userId, db, schema y prueba.


markNotificationRead(...): ResponseInterface

Endpoint: PATCH /backend/jobs/notifications/{id}/read

Marca notificacion como leida. Retorna 200 con {"status": "success"} o 404 si no se encuentra.


JobStreamController

Ubicacion: Core/Controllers/JobStreamController.php

Namespace: App\Core\Controllers

Proposito: Endpoint SSE para recibir actualizaciones en tiempo real

Endpoint: GET /backend/jobs/{id}/stream

Dependencias:

  • JobRepository - Para cargar el job
  • Payload - Para owner check
  • ConnectionManager - Para obtener PDO nativo (LISTEN)
  • LoggerInterface - Para logging de eventos SSE

Response: Server-Sent Events (SSE)

Event types:

  • status_changed: Cuando el status del job cambia (pending -> running -> completed/failed)
  • progress_updated: Cuando el progreso cambia (progress < 100)
  • error: Error al conectar al stream

Comportamiento:

  1. Si el job ya termino (completed o failed): retorna un unico evento status_changed como respuesta HTTP normal (sin abrir LISTEN)
  2. Si el job esta activo: abre conexion SSE con PostgreSQL LISTEN "job_updates_{schema}_{id}"
  3. Envia estado actual inmediatamente como primer evento
  4. Heartbeat cada ~20s para mantener conexion viva
  5. Timeout configurable via JOB_STREAM_MAX_SECONDS (default: 300s)
  6. Cierra stream cuando job termina o se alcanza el timeout

Canal PostgreSQL NOTIFY: job_updates_{schema}_{id} (ej: job_updates_suc0001_123). Incluye el schema para evitar colision cross-tenant, ya que los IDs auto-increment pueden coincidir entre schemas diferentes.

Nota sobre uso en frontend: El endpoint esta completamente implementado en backend. El frontend actualmente usa polling (GET /backend/jobs/{id}) por limitaciones de EventSource que no soporta headers custom de autorizacion (cross-domain).


AdminJobController

Ubicacion: Core/Controllers/AdminJobController.php

Namespace: App\Core\Controllers

Proposito: Operaciones de administracion de background jobs (listar fallidos, reintentar, eliminar)

Dependencias:

  • JobRepository - Para queries de jobs
  • JobDispatcher - Para redespachar jobs
  • Payload - Para datos del JWT
  • ConnectionManager - Para acceso a BD
  • AdminPermissionChecker - Para verificar permisos admin

Autenticacion: JWT + verificacion de permiso admin. Actualmente (2026-02-24) isAdmin() retorna true para todos los usuarios autenticados, pendiente del semillado de rel_grupos_usuarios.

Endpoints:

  • GET /backend/jobs/admin/failed - Lista jobs fallidos con paginacion
  • GET /backend/jobs/admin/{id} - Obtiene job por ID (cualquier estado)
  • POST /backend/jobs/admin/{id}/retry - Reintenta job fallido (solo estado failed)
  • DELETE /backend/jobs/admin/{id} - Elimina job permanentemente (hard delete)

Serializacion admin: Incluye campos adicionales vs el endpoint de usuario: user_id, db, schema, progress, retry_count, max_retries, next_retry_at.

Ver 03-api-endpoints.md para detalle completo de request/response.


MonitoringController

Ubicacion: Core/Controllers/MonitoringController.php

Namespace: App\Core\Controllers

Proposito: Endpoints de monitoreo para health checks y metricas Prometheus

Dependencias:

  • JobRepository - Para obtener metricas
  • HealthChecker - Para ejecutar health checks

Autenticacion: Bearer token estatico via MetricsAuthMiddleware (NO JWT)

Endpoints:

  • GET /backend/jobs/health - Health check JSON (200 si healthy/degraded, 503 si unhealthy)
  • GET /backend/jobs/metrics - Metricas en formato Prometheus text format (text/plain; version=0.0.4)

Ver 03-api-endpoints.md para detalle completo de response.


Middleware

MetricsAuthMiddleware

Ubicacion: Middleware/MetricsAuthMiddleware.php

Namespace: App\Middleware

Proposito: Autenticar endpoints de monitoring con token estatico en lugar de JWT

Header esperado: Authorization: Bearer {METRICS_SECRET}

Comportamiento:

  • Si METRICS_SECRET no esta configurado en .env: retorna 503 Service Unavailable
  • Si el header no coincide con el secret: retorna 401 Unauthorized
  • Si coincide: pasa el request al handler

Aplicado a: GET /backend/jobs/health y GET /backend/jobs/metrics


CLI Workers

background-worker.php

Ubicacion: cli/background-worker.php

Proposito: Script CLI que ejecuta un job especifico

Uso:

bash
php cli/background-worker.php {job_id} {schema} {db}

Argumentos CLI:

  • job_id (int): ID del job a ejecutar
  • schema (string): Schema PostgreSQL del job
  • db (string): Base de datos (ya con sufijo _p si es modo prueba)

Flujo (Two-Phase Bootstrap) -- Ver ADR-006:

Fase 1 -- Lectura minima del job:

  1. Conexion PDO minima (sin DI container)
  2. Leer payload del job por ID
  3. Extraer _context del payload (cuit, nro_cliente, id_usuario, id, sistema, schema_level, prueba)

Fase 2 -- Bootstrap completo con contexto: 4. Construir objeto Payload sintetico desde _context 5. Bootstrapear DI container completo con Payload inyectado 6. Obtener JobExecutor del container 7. Ejecutar: $executor->execute($jobId) 8. Exit code: 0 si OK, 1 si error

Ejecucion en background:

bash
# Lanzado por JobDispatcher con exec()
php cli/background-worker.php 123 suc0001 empresa_xyz >> logs/background-jobs.log 2>&1 &

# Lanzado por AdminJobController::retry() para reintentos manuales
php cli/background-worker.php 123 suc0001 empresa_xyz >> logs/background-jobs.log 2>&1 &

Logging:

  • Log a archivo: logs/background-jobs.log
  • Structured logging con context: ['job_id' => 123, 'type' => 'batch_invoicing']

bootstrap-cli.php

Ubicacion: cli/bootstrap-cli.php

Proposito: Bootstrap del sistema sin HTTP (para CLI scripts)

Diferencias con bootstrap HTTP:

  • NO carga Slim App
  • NO carga routes
  • SI carga DI container (Fase 2, con Payload sintetico)
  • SI carga ConnectionManager
  • SI carga configuracion

Nota: El bootstrap se ejecuta en Fase 2 del worker, despues de que Fase 1 haya extraido _context del job payload. El Payload sintetico (construido desde _context) se inyecta en el container para que servicios como BatchInvoicingAuditService y ArcaClientFactory tengan acceso a cuit, nro_cliente, etc. Ver ADR-006.


NOTA IMPORTANTE: Esta documentacion fue generada/actualizada a partir del codigo implementado. Validar con stakeholders antes de considerar final. Ultima verificacion contra codigo: 2026-02-27 (scope-lock guard, extractRootSchema, JobAlreadyActiveForScopeException).


< Volver al indice | Siguiente: Base de Datos >