Skip to main content

Ports & Adapters Architecture

Overview

The Ominis Cluster Manager implements Hexagonal Architecture (also known as Ports and Adapters) to achieve clean separation between business logic and infrastructure concerns. This pattern provides testability, flexibility, and maintainability by defining clear boundaries between application core and external systems.

Key Benefit

Routes depend only on port interfaces, never on concrete implementations. This enables unit testing without infrastructure, easy substitution of backends, and clear architectural boundaries.

What is Hexagonal Architecture?

Hexagonal Architecture organizes code into three layers:

  1. Application Core (center of hexagon): Business logic and port interfaces
  2. Ports (hexagon edges): Abstract interfaces defining what the application needs
  3. Adapters (outside hexagon): Concrete implementations connecting to infrastructure
┌────────────────────────────────────────────────────────────────┐
│ External Systems Layer │
│ │
│ Kubernetes PostgreSQL FreeSWITCH XML-RPC n8n │
└───────────────────────┬────────────────────────────────────────┘

│ Implemented by
v
┌────────────────────────────────────────────────────────────────┐
│ Adapters Layer │
│ (Infrastructure Integration) │
│ │
│ K8sQueueOrchestrator DatabaseCallcenterAdapter │
│ XMLRPCTelephonyAdapter XMLCURLDirectoryAdapter │
│ IVRSocketHandler OpenAITTSAdapter │
└───────────────────────┬────────────────────────────────────────┘

│ Implements
v
┌────────────────────────────────────────────────────────────────┐
│ Application Core │
│ (Port Interfaces) │
│ │
│ QueueOrchestratorPort CallcenterPort │
│ TelephonyPort DirectoryPort │
└───────────────────────┬────────────────────────────────────────┘

│ Depends on (via DI)
v
┌────────────────────────────────────────────────────────────────┐
│ HTTP Layer │
│ (FastAPI Routes) │
│ │
│ /v1/queues /v1/agents /v1/telephony /v1/directory │
└────────────────────────────────────────────────────────────────┘

Why Ports & Adapters?

Problems Before Ports & Adapters

The original architecture had tight coupling issues:

# OLD: Tightly coupled route (anti-pattern)
from container_queue_manager import queue_manager # Global singleton

@router.post("/queues")
def create_queue(queue: QueueCreate):
# Directly coupled to specific implementation
result = queue_manager.create_queue(queue.name)
return result

Issues:

  • ❌ Routes directly import concrete implementations
  • ❌ Testing requires full infrastructure (Docker/Kubernetes)
  • ❌ Switching backends requires changing route code
  • ❌ Circular dependencies between modules
  • ❌ Business logic mixed with infrastructure

Benefits After Ports & Adapters

# NEW: Decoupled route with ports
from core.ports.orchestrator import QueueOrchestratorPort
from core.di import get_orchestrator

@router.post("/queues")
async def create_queue(
queue: QueueCreate,
orchestrator: QueueOrchestratorPort = Depends(get_orchestrator)
):
# Decoupled from implementation details
result = await orchestrator.create_queue(queue)
return result

Benefits:

  • ✅ Routes depend only on abstract interfaces
  • ✅ Unit testing with mock adapters (no infrastructure needed)
  • ✅ Easy backend substitution (Kubernetes ↔ Docker)
  • ✅ Clear separation of concerns
  • ✅ No circular dependencies
  • ✅ Progressive migration path

Port Interfaces

Ports define what the application needs without specifying how it's implemented. All ports are abstract base classes in core/ports/.

QueueOrchestratorPort

Purpose: Queue lifecycle orchestration across deployment backends (Kubernetes, Docker)

Location: core/ports/orchestrator.py

Key Methods:

class QueueOrchestratorPort(ABC):
"""Port for queue orchestration across deployment backends."""

@abstractmethod
async def create_queue(
self, queue_data: QueueCreate, gateway_password: str = None
) -> QueueModel:
"""Create a new FreeSWITCH queue deployment"""

@abstractmethod
def get_all_queues(self) -> List[QueueModel]:
"""List all queue deployments"""

@abstractmethod
def get_queue(self, queue_name: str) -> Optional[QueueModel]:
"""Get specific queue deployment"""

@abstractmethod
def update_queue(
self, queue_name: str, queue_data: QueueUpdate
) -> Optional[QueueModel]:
"""Update queue configuration"""

@abstractmethod
async def delete_queue(self, queue_name: str) -> bool:
"""Delete queue deployment"""

Use Cases:

  • Queue pod creation/deletion
  • Queue configuration updates
  • Queue status queries
  • Multi-backend deployment (K8s, Docker)

TelephonyPort

Purpose: Call control operations via FreeSWITCH XML-RPC

Location: core/ports/telephony.py

Key Methods:

class TelephonyPort(ABC):
"""Port for telephony operations via XML-RPC."""

@abstractmethod
async def originate_call(
self,
aleg_endpoint: str,
bleg_application: str,
bleg_args: Optional[str] = None,
aleg_variables: Optional[Dict[str, str]] = None,
timeout: int = 60,
) -> Optional[str]:
"""Originate a two-legged call"""

@abstractmethod
async def hangup_call(self, uuid: str, cause: str = "NORMAL_CLEARING") -> bool:
"""Hangup active call"""

@abstractmethod
async def transfer_call(
self, uuid: str, destination: str, dialplan: str = "XML", context: str = "default"
) -> bool:
"""Transfer call to new destination"""

@abstractmethod
async def bridge_calls(self, uuid1: str, uuid2: str) -> bool:
"""Bridge two active calls"""

@abstractmethod
async def get_channels(self) -> List[Dict]:
"""Get all active channels"""

@abstractmethod
async def get_channel_info(self, uuid: str) -> Optional[Dict[str, str]]:
"""Get specific channel information"""

# Plus: play_file, record_call, set_variable, hold_call, mute_call, etc.

Use Cases:

  • Originating outbound calls
  • Call transfer and bridge operations
  • Call recording and playback
  • Channel monitoring and control
  • DTMF handling

CallcenterPort

Purpose: mod_callcenter operations (agents, tiers, members, queue lifecycle)

Location: core/ports/callcenter.py

Key Methods:

class CallcenterPort(ABC):
"""Port for callcenter operations (mod_callcenter management)."""

# Agent Operations (Database-backed)
@abstractmethod
async def create_agent(self, agent: "AgentCreate") -> "Agent":
"""Create a new agent in the database"""

@abstractmethod
async def list_agents(
self, queue_name: Optional[str] = None, status: Optional[str] = None
) -> List["Agent"]:
"""List agents with optional filtering"""

@abstractmethod
async def update_agent_status(self, name: str, status: str) -> "Agent":
"""Update agent status (Available, On Break, etc.)"""

# Tier Operations (Database-backed)
@abstractmethod
async def create_tier(self, tier: "TierCreate") -> "Tier":
"""Create tier assignment (agent to queue)"""

@abstractmethod
async def list_tiers(
self, queue: Optional[str] = None, agent: Optional[str] = None
) -> List["Tier"]:
"""List tiers with optional filtering"""

# Member Operations (Database-backed, read-only)
@abstractmethod
async def list_members(
self, queue: Optional[str] = None, state: Optional[str] = None
) -> List["Member"]:
"""List members (callers) with optional filtering"""

@abstractmethod
async def get_queue_count(self, queue: str) -> Dict[str, int]:
"""Get queue member counts by state"""

# Queue Lifecycle Operations (XML-RPC-backed)
@abstractmethod
async def load_queue(self, queue_name: str) -> str:
"""Load a queue via XML-RPC command"""

@abstractmethod
async def get_queue_status(self, queue_name: str) -> Dict[str, str]:
"""Get comprehensive real-time queue status"""

Backend Strategy: Hybrid

  • Database: Agent, tier, member CRUD operations
  • XML-RPC: Queue lifecycle (load/unload/reload)

Use Cases:

  • Agent management (create, update status, assign to queues)
  • Tier management (assign agents to queues with priority)
  • Member monitoring (track caller queue position)
  • Queue status queries

DirectoryPort

Purpose: FreeSWITCH user provisioning via mod_xml_curl

Location: core/ports/directory.py

Key Methods:

class DirectoryPort(ABC):
"""Port for FreeSWITCH directory/user provisioning."""

@abstractmethod
async def get_user_xml(
self, username: str, domain: str, purpose: str
) -> Optional[str]:
"""Generate FreeSWITCH directory XML for user lookup"""

@abstractmethod
async def reload_profile(self, profile_name: str) -> bool:
"""Reload Sofia profile via XML-RPC"""

Authentication Model: Hybrid

  • Cluster IPs (in ACL): Blind registration, no auth needed
  • External IPs (not in ACL): Challenged, authenticated via XML-CURL

Use Cases:

  • Dynamic user authentication (external SIP clients)
  • Extension provisioning from PostgreSQL
  • Sofia profile reloading after config changes

Adapter Implementations

Adapters are concrete implementations of ports that connect to real infrastructure. Located in adapters/.

K8sQueueOrchestrator

Implements: QueueOrchestratorPort

Backend: Kubernetes API (server-side apply)

Location: adapters/orchestrator_k8s.py

Key Features:

  • YAML-based queue deployment templates
  • Server-side apply for declarative updates
  • Namespace-scoped queue management
  • Kubernetes Service and Deployment orchestration
  • Pod restart capability (delete pod, Deployment recreates)

Example Usage:

orchestrator = K8sQueueOrchestrator(namespace="client-demo-client")

# Create queue deployment
queue = await orchestrator.create_queue(
QueueCreate(
name="support",
strategy="longest-idle-agent",
max_wait_time=300
)
)

# List all queues
queues = orchestrator.get_all_queues()

# Delete queue
await orchestrator.delete_queue("support")

Template Variables:

  • queue_name, namespace, strategy
  • moh_sound, announce_frequency, max_wait_time
  • global_codec_prefs, outbound_codec_prefs
  • registrar_host, registrar_port, gateway_password

XMLRPCTelephonyAdapter

Implements: TelephonyPort

Backend: FreeSWITCH XML-RPC API

Location: adapters/telephony_xml_rpc.py

Key Features:

  • Connection pooling with semaphore-based rate limiting
  • Automatic retry logic with configurable attempts
  • Timeout handling per operation
  • Async/await support for all operations

Concurrency Control:

class XMLRPCTelephonyAdapter(TelephonyPort):
def __init__(self):
self._settings = get_settings()
self._semaphore = asyncio.Semaphore(
self._settings.XMLRPC_MAX_CONCURRENCY
)

Retry Pattern:

async def originate_call(self, aleg_endpoint: str, ...) -> Optional[str]:
client = await self._client()
attempts = max(1, self._settings.XMLRPC_MAX_RETRIES + 1)
last_error: Optional[Exception] = None

for _ in range(attempts):
try:
async with self._limited():
return await asyncio.wait_for(
client.originate_call(...),
timeout=self._settings.XMLRPC_TIMEOUT_SECONDS,
)
except Exception as e:
last_error = e

raise last_error

DatabaseCallcenterAdapter

Implements: CallcenterPort

Backend: PostgreSQL (asyncpg) + FreeSWITCH XML-RPC

Location: adapters/callcenter_db.py

Key Features:

  • Connection pooling with asyncpg
  • Direct database access for agents/tiers/members
  • XML-RPC fallback for queue lifecycle operations
  • Hybrid backend strategy (database + XML-RPC)

Connection Management:

class DatabaseCallcenterAdapter(CallcenterPort):
async def connect(self):
"""Create database connection pool."""
self.pool = await asyncpg.create_pool(
host=self.db_host,
port=self.db_port,
user=self.db_user,
password=self.db_password,
database=self.db_name,
min_size=2,
max_size=10,
command_timeout=5,
timeout=10,
)

async def disconnect(self):
"""Close database connection pool."""
if self.pool:
await self.pool.close()

Backend Strategy:

  • PostgreSQL: Agent, tier, member CRUD (fast, consistent)
  • XML-RPC: Queue lifecycle commands (load/unload/reload)

XMLCURLDirectoryAdapter

Implements: DirectoryPort

Backend: PostgreSQL (extensions table) + FreeSWITCH XML-RPC

Location: adapters/directory_xmlcurl.py

Key Features:

  • Dynamic user XML generation from PostgreSQL
  • Support for call forwarding, voicemail, variables
  • Profile reload via XML-RPC
  • "Not found" XML responses

XML Generation:

async def get_user_xml(
self, username: str, domain: str, purpose: str
) -> Optional[str]:
# Fetch extension from PostgreSQL
extension = await self.repo.get_by_number(username)

if not extension or extension.status.value != "active":
return self._generate_not_found_xml()

return self._generate_user_xml(extension, domain)

Generated XML Structure:

<document type="freeswitch/xml">
<section name="directory">
<domain name="domain.com">
<user id="1001">
<params>
<param name="password" value="secret123"/>
<param name="dial-string" value="..."/>
</params>
<variables>
<variable name="user_context" value="default"/>
<variable name="effective_caller_id_name" value="John Doe"/>
<variable name="email" value="john@example.com"/>
</variables>
</user>
</domain>
</section>
</document>

Dependency Injection

DI Module: core/di.py

The DI container wires ports to adapters at runtime, providing singleton instances to routes via FastAPI's dependency injection.

DI Container Pattern

from functools import lru_cache
from typing import Optional

from adapters.orchestrator_k8s import K8sQueueOrchestrator
from adapters.telephony_xml_rpc import XMLRPCTelephonyAdapter
from adapters.callcenter_db import DatabaseCallcenterAdapter
from adapters.directory_xmlcurl import XMLCURLDirectoryAdapter

from core.ports.orchestrator import QueueOrchestratorPort
from core.ports.telephony import TelephonyPort
from core.ports.callcenter import CallcenterPort
from core.ports.directory import DirectoryPort


@lru_cache(maxsize=1)
def get_orchestrator() -> QueueOrchestratorPort:
"""Get queue orchestrator singleton."""
namespace = os.getenv("KUBERNETES_NAMESPACE", "demo-client")
return K8sQueueOrchestrator(namespace=namespace)


@lru_cache(maxsize=1)
def get_telephony() -> TelephonyPort:
"""Get telephony adapter singleton."""
return XMLRPCTelephonyAdapter()


@lru_cache(maxsize=1)
def get_directory() -> DirectoryPort:
"""Get directory adapter singleton."""
return XMLCURLDirectoryAdapter()


# Global adapter instance for lifecycle management
_callcenter_adapter: Optional[DatabaseCallcenterAdapter] = None


def get_callcenter() -> CallcenterPort:
"""Get callcenter adapter (database-backed)."""
global _callcenter_adapter
if _callcenter_adapter is None:
_callcenter_adapter = DatabaseCallcenterAdapter()
return _callcenter_adapter


async def initialize_callcenter():
"""Initialize callcenter adapter database connection pool."""
adapter = get_callcenter()
if isinstance(adapter, DatabaseCallcenterAdapter):
await adapter.connect()


async def shutdown_callcenter():
"""Shutdown callcenter adapter and close database connections."""
global _callcenter_adapter
if _callcenter_adapter:
await _callcenter_adapter.disconnect()
_callcenter_adapter = None

Lifecycle Management

Application Startup:

@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize adapters with connection pools
await initialize_callcenter()
yield
# Shutdown adapters and close connections
await shutdown_callcenter()

app = FastAPI(lifespan=lifespan)

Usage in Routes

Dependency Injection via FastAPI:

from fastapi import APIRouter, Depends
from core.ports.orchestrator import QueueOrchestratorPort
from core.di import get_orchestrator

router = APIRouter(prefix="/v1/queues")

@router.post("/queues")
async def create_queue(
queue: QueueCreate,
orchestrator: QueueOrchestratorPort = Depends(get_orchestrator)
):
"""Create queue - decoupled from implementation."""
result = await orchestrator.create_queue(queue)
return {"queue": result}

@router.get("/queues")
def list_queues(
orchestrator: QueueOrchestratorPort = Depends(get_orchestrator)
):
"""List queues - works with any orchestrator backend."""
queues = orchestrator.get_all_queues()
return {"queues": queues}

Dependency Flow Diagram

┌─────────────────────────────────────────────────────────────┐
│ FastAPI Application │
│ │
│ @asynccontextmanager │
│ async def lifespan(app): │
│ await initialize_callcenter() # Connect DB pool │
│ yield │
│ await shutdown_callcenter() # Close connections │
└──────────────────────────┬──────────────────────────────────┘

│ Calls
v
┌─────────────────────────────────────────────────────────────┐
│ Dependency Injection │
│ (core/di.py) │
│ │
│ get_orchestrator() → K8sQueueOrchestrator │
│ get_telephony() → XMLRPCTelephonyAdapter │
│ get_callcenter() → DatabaseCallcenterAdapter │
│ get_directory() → XMLCURLDirectoryAdapter │
└──────────────────────────┬──────────────────────────────────┘

│ Provides
v
┌─────────────────────────────────────────────────────────────┐
│ FastAPI Routes │
│ │
│ @router.post("/queues") │
│ def create_queue( │
│ orchestrator: QueueOrchestratorPort = Depends(...) │
│ ) │
└─────────────────────────────────────────────────────────────┘

Testing Benefits

The primary benefit of Ports & Adapters is testability without infrastructure.

Unit Testing with Mock Adapters

Before (tightly coupled):

def test_create_queue():
# ❌ Requires Docker/Kubernetes running
response = client.post("/v1/queues", json={"name": "support"})
assert response.status_code == 200

After (with ports):

from unittest.mock import Mock
from core.ports.orchestrator import QueueOrchestratorPort

def test_create_queue():
# ✅ Mock adapter, no infrastructure needed
mock_orchestrator = Mock(spec=QueueOrchestratorPort)
mock_orchestrator.create_queue.return_value = Queue(
name="support",
strategy="longest-idle-agent"
)

# Inject mock via dependency override
app.dependency_overrides[get_orchestrator] = lambda: mock_orchestrator

response = client.post("/v1/queues", json={"name": "support"})

assert response.status_code == 200
assert response.json()["queue"]["name"] == "support"
mock_orchestrator.create_queue.assert_called_once()

Contract Tests for Adapters

Ensure adapters correctly implement port interfaces:

import pytest
from core.ports.telephony import TelephonyPort
from adapters.telephony_xml_rpc import XMLRPCTelephonyAdapter

@pytest.mark.asyncio
async def test_telephony_adapter_implements_port():
"""Verify adapter implements all port methods."""
adapter = XMLRPCTelephonyAdapter()

# Check adapter is instance of port
assert isinstance(adapter, TelephonyPort)

# Check all abstract methods are implemented
assert hasattr(adapter, 'originate_call')
assert hasattr(adapter, 'hangup_call')
assert hasattr(adapter, 'transfer_call')
# ... etc.

Fake Implementations for Integration Tests

Create fake adapters for integration testing without external dependencies:

class FakeCallcenterAdapter(CallcenterPort):
"""In-memory fake for integration testing."""

def __init__(self):
self.agents = {}
self.tiers = []
self.members = []

async def create_agent(self, agent: AgentCreate) -> Agent:
if agent.name in self.agents:
raise ValueError(f"Agent {agent.name} already exists")

new_agent = Agent(**agent.dict(), status="Logged Out")
self.agents[agent.name] = new_agent
return new_agent

async def list_agents(self, **filters) -> List[Agent]:
return list(self.agents.values())

# ... implement other methods ...

Usage in Integration Tests:

def test_agent_workflow():
# Use fake adapter for fast integration tests
app.dependency_overrides[get_callcenter] = lambda: FakeCallcenterAdapter()

# Create agent
response = client.post("/v1/agents", json={"name": "john"})
assert response.status_code == 200

# List agents
response = client.get("/v1/agents")
assert len(response.json()["agents"]) == 1

Migration Strategy

Ports & Adapters was adopted incrementally to avoid big-bang rewrites.

Migration Steps

  1. Create Port Interface (core/ports/)

    • Define abstract methods for domain operations
    • Use type hints for clear contracts
  2. Create Adapter (adapters/)

    • Implement port interface
    • Wrap existing infrastructure code
  3. Add DI Factory (core/di.py)

    • Create singleton factory function
    • Use @lru_cache for performance
  4. Update Routes (one at a time)

    • Replace direct imports with DI
    • Use Depends(get_port) pattern
  5. Add Tests

    • Unit tests with mock adapters
    • Contract tests for adapter compliance

Migration Status

PortStatusRoutes Migrated
QueueOrchestratorPort/v1/queues
CallcenterPort/v1/agents, /v1/tiers, /v1/members
TelephonyPort/v1/telephony
DirectoryPort/v1/freeswitch/directory

Pending Work

  • ⏳ Migrate campaign operations to TelephonyPort
  • ⏳ Add IVROrchestratorPort for IVR pod management
  • ⏳ Create test doubles (fakes) for common scenarios
  • ⏳ Document port design guidelines

Best Practices

Port Design

  1. Abstract, Not Implementation-Specific

    # ✅ Good: Abstract operation
    async def create_queue(self, queue: QueueCreate) -> Queue

    # ❌ Bad: K8s-specific
    async def create_k8s_deployment(self, yaml: str) -> Deployment
  2. Return Domain Models, Not Infrastructure Objects

    # ✅ Good: Domain model
    async def get_queue(self, name: str) -> Optional[Queue]

    # ❌ Bad: Kubernetes object
    async def get_deployment(self, name: str) -> V1Deployment
  3. Use Type Hints

    # ✅ Good: Clear contract
    async def list_agents(
    self,
    queue_name: Optional[str] = None,
    status: Optional[str] = None
    ) -> List[Agent]

Adapter Implementation

  1. Implement All Methods

    • Never leave abstract methods unimplemented
    • Raise NotImplementedError with context if truly not supported
  2. Handle Errors Gracefully

    try:
    result = await self._perform_operation()
    return result
    except SpecificInfraError as e:
    logger.error(f"Operation failed: {e}")
    raise RuntimeError(f"Infrastructure error: {e}")
  3. Log at Adapter Level

    • Log infrastructure operations (connection, retry, error)
    • Don't log in port interfaces (too abstract)
  4. Use Connection Pooling

    async def connect(self):
    """Initialize connection pool on startup."""
    self.pool = await create_pool(...)

    async def disconnect(self):
    """Close connections on shutdown."""
    if self.pool:
    await self.pool.close()

Dependency Injection

  1. Use @lru_cache for Stateless Adapters

    @lru_cache(maxsize=1)
    def get_telephony() -> TelephonyPort:
    return XMLRPCTelephonyAdapter()
  2. Manual Singleton for Stateful Adapters

    _callcenter_adapter: Optional[DatabaseCallcenterAdapter] = None

    def get_callcenter() -> CallcenterPort:
    global _callcenter_adapter
    if _callcenter_adapter is None:
    _callcenter_adapter = DatabaseCallcenterAdapter()
    return _callcenter_adapter
  3. Initialize in Lifespan

    @asynccontextmanager
    async def lifespan(app: FastAPI):
    await initialize_adapters()
    yield
    await shutdown_adapters()

Testing

  1. Mock at Port Level

    mock_orchestrator = Mock(spec=QueueOrchestratorPort)
    app.dependency_overrides[get_orchestrator] = lambda: mock_orchestrator
  2. Test Adapters with Real Infrastructure (CI Only)

    @pytest.mark.integration
    @pytest.mark.skipif(not CI, reason="Requires infrastructure")
    async def test_k8s_adapter():
    adapter = K8sQueueOrchestrator(namespace="test")
    # Test against real Kubernetes
  3. Create Fake Implementations

    class FakeTelephonyAdapter(TelephonyPort):
    """In-memory fake for fast integration tests."""
    # Implement all methods with in-memory state

Code Examples

Example 1: Creating a New Port

# core/ports/recording.py
from abc import ABC, abstractmethod
from typing import List, Optional

class RecordingPort(ABC):
"""Port for call recording operations."""

@abstractmethod
async def start_recording(
self, call_uuid: str, filename: str
) -> bool:
"""Start recording a call."""
raise NotImplementedError

@abstractmethod
async def stop_recording(self, call_uuid: str) -> bool:
"""Stop recording a call."""
raise NotImplementedError

@abstractmethod
async def list_recordings(self) -> List[str]:
"""List all recordings."""
raise NotImplementedError

Example 2: Implementing an Adapter

# adapters/recording_freeswitch.py
import logging
from core.ports.recording import RecordingPort
from adapters.xml_rpc_client import get_xml_rpc_client

logger = logging.getLogger(__name__)

class FreeSWITCHRecordingAdapter(RecordingPort):
"""Adapter for FreeSWITCH call recording."""

async def start_recording(
self, call_uuid: str, filename: str
) -> bool:
try:
client = await get_xml_rpc_client()
response = await client.execute_command(
"uuid_record",
f"{call_uuid} start {filename}"
)
return response.success
except Exception as e:
logger.error(f"Failed to start recording: {e}")
return False

async def stop_recording(self, call_uuid: str) -> bool:
try:
client = await get_xml_rpc_client()
response = await client.execute_command(
"uuid_record",
f"{call_uuid} stop"
)
return response.success
except Exception as e:
logger.error(f"Failed to stop recording: {e}")
return False

async def list_recordings(self) -> List[str]:
# Implementation for listing recordings
pass

Example 3: Adding to DI Container

# core/di.py
from functools import lru_cache
from core.ports.recording import RecordingPort
from adapters.recording_freeswitch import FreeSWITCHRecordingAdapter

@lru_cache(maxsize=1)
def get_recording() -> RecordingPort:
"""Get recording adapter singleton."""
return FreeSWITCHRecordingAdapter()

Example 4: Using in Routes

# routers/recording.py
from fastapi import APIRouter, Depends
from core.ports.recording import RecordingPort
from core.di import get_recording

router = APIRouter(prefix="/v1/recording")

@router.post("/{call_uuid}/start")
async def start_recording(
call_uuid: str,
filename: str,
recording: RecordingPort = Depends(get_recording)
):
"""Start recording a call."""
success = await recording.start_recording(call_uuid, filename)
return {"success": success}

@router.post("/{call_uuid}/stop")
async def stop_recording(
call_uuid: str,
recording: RecordingPort = Depends(get_recording)
):
"""Stop recording a call."""
success = await recording.stop_recording(call_uuid)
return {"success": success}

Example 5: Testing with Mocks

# tests/test_recording.py
from unittest.mock import Mock
import pytest
from fastapi.testclient import TestClient
from core.ports.recording import RecordingPort
from core.di import get_recording
from app import app

@pytest.fixture
def mock_recording():
"""Create mock recording adapter."""
mock = Mock(spec=RecordingPort)
mock.start_recording.return_value = True
mock.stop_recording.return_value = True
return mock

def test_start_recording(mock_recording):
"""Test start recording endpoint."""
app.dependency_overrides[get_recording] = lambda: mock_recording
client = TestClient(app)

response = client.post(
"/v1/recording/test-uuid-123/start?filename=recording1.wav"
)

assert response.status_code == 200
assert response.json()["success"] is True
mock_recording.start_recording.assert_called_once_with(
"test-uuid-123", "recording1.wav"
)

Diagrams

Diagram 1: Hexagonal Architecture Overview

                      ┌─────────────────────┐
│ HTTP Requests │
└──────────┬──────────┘

v
┌────────────────────────────────────────┐
│ FastAPI Routes │
│ (Presentation Layer) │
│ │
│ - Request validation │
│ - Response serialization │
│ - HTTP error handling │
└─────────────────┬──────────────────────┘

│ Depends on
v
┌────────────────────────────────────────┐
│ Application Core │
│ (Port Interfaces) │
│ │
│ - QueueOrchestratorPort │
│ - TelephonyPort │
│ - CallcenterPort │
│ - DirectoryPort │
└─────────────────┬──────────────────────┘

│ Implemented by
v
┌────────────────────────────────────────┐
│ Infrastructure Layer │
│ (Adapters) │
│ │
│ - K8sQueueOrchestrator │
│ - XMLRPCTelephonyAdapter │
│ - DatabaseCallcenterAdapter │
│ - XMLCURLDirectoryAdapter │
└─────────────────┬──────────────────────┘

│ Connects to
v
┌────────────────────────────────────────┐
│ External Systems │
│ │
│ Kubernetes PostgreSQL FreeSWITCH │
└────────────────────────────────────────┘

Diagram 2: Port-Adapter Relationship

┌──────────────────────┐         ┌──────────────────────┐
│ QueueOrchestratorPort │ TelephonyPort │
│ (Abstract Interface)│ │ (Abstract Interface)│
│ │ │ │
│ + create_queue() │ │ + originate_call() │
│ + delete_queue() │ │ + hangup_call() │
│ + get_all_queues() │ │ + transfer_call() │
└──────────┬───────────┘ └──────────┬───────────┘
│ │
│ implements │ implements
│ │
┌──────────▼───────────┐ ┌─────────▼────────────┐
│ K8sQueueOrchestrator │ │ XMLRPCTelephonyAdapter│
│ │ │ │
│ Uses: │ │ Uses: │
│ - Kubernetes API │ │ - XML-RPC Client │
│ - YAML templates │ │ - Async semaphore │
│ - Server-side apply │ │ - Retry logic │
└──────────────────────┘ └──────────────────────┘

Diagram 3: Dependency Injection Flow

1. Application Startup
┌────────────────────┐
│ FastAPI lifespan │
│ │
│ initialize_*() │
└─────────┬──────────┘

v
┌─────────────────────┐
│ core/di.py │
│ │
│ - Create adapters │
│ - Connect pools │
└─────────┬───────────┘

v
┌─────────────────────┐
│ Adapter Instances │
│ │
│ Stored in cache │
└─────────────────────┘

2. Request Handling
┌────────────────────┐
│ HTTP Request │
└─────────┬──────────┘

v
┌─────────────────────┐
│ FastAPI Route │
│ │
│ Depends(get_port) │
└─────────┬───────────┘

v
┌─────────────────────┐
│ DI Container │
│ │
│ Returns cached │
│ adapter instance │
└─────────┬───────────┘

v
┌─────────────────────┐
│ Adapter executes │
│ infrastructure op │
└─────────────────────┘

3. Application Shutdown
┌────────────────────┐
│ FastAPI shutdown │
│ │
│ shutdown_*() │
└─────────┬──────────┘

v
┌─────────────────────┐
│ Adapters cleanup │
│ │
│ - Close pools │
│ - Release resources│
└─────────────────────┘

References


Summary

The Ports & Adapters pattern provides:

  • Testability: Unit tests without infrastructure
  • Flexibility: Easy backend substitution
  • Maintainability: Clear separation of concerns
  • Progressive Migration: Incremental adoption
  • Type Safety: Strong contracts via abstract interfaces
  • Dependency Control: Unidirectional flow (routes → ports ← adapters)

Next Steps:

  1. Complete migration of remaining routes
  2. Add contract tests for all adapters
  3. Create fake implementations for integration testing
  4. Document port design guidelines