Skip to main content
Good logging helps you understand what your agent is doing. In production, observability is essential for debugging issues and improving performance.

Basic Logging with Loguru

The Atoms SDK uses Loguru for logging. Set it up in your agent:
from loguru import logger


class MyAgent(OutputAgentNode):
    async def generate_response(self):
        logger.info("Generating response")
        
        try:
            response = await self.llm.chat(
                messages=self.context.messages,
                stream=True
            )
            
            async for chunk in response:
                if chunk.content:
                    yield chunk.content
                    
        except Exception as e:
            logger.error(f"LLM call failed: {e}")
            yield "I'm having trouble right now. Please try again."

Log Levels

Use appropriate log levels:
LevelUse ForExample
DEBUGDetailed debugging infoMessage contents, event data
INFONormal operationsSession start, tool calls
WARNINGUnexpected but handledFallback used, retry needed
ERRORFailuresAPI errors, exceptions
logger.debug(f"Received event: {event}")
logger.info(f"Session started: {session.id}")
logger.warning(f"Using fallback LLM, primary failed")
logger.error(f"Tool execution failed: {error}")

Structured Logging

Add context to your logs:
logger.info(
    "Tool executed",
    extra={
        "tool_name": "get_order",
        "duration_ms": 145,
        "success": True,
        "session_id": self.session_id
    }
)

Logging Events

Track all events flowing through your agent:
class LoggingAgent(OutputAgentNode):
    async def process_event(self, event):
        logger.debug(
            f"Processing event",
            extra={
                "event_type": event.type,
                "session_id": self.session_id
            }
        )
        
        await super().process_event(event)

Logging Tool Calls

Track tool usage:
async def generate_response(self):
    # ... LLM call ...
    
    if tool_calls:
        for call in tool_calls:
            logger.info(
                f"Executing tool: {call.name}",
                extra={
                    "tool_name": call.name,
                    "arguments": call.args
                }
            )
        
        start = time.time()
        results = await self.tool_registry.execute(tool_calls, parallel=True)
        duration = time.time() - start
        
        logger.info(
            f"Tools executed in {duration:.2f}s",
            extra={
                "tool_count": len(tool_calls),
                "duration_seconds": duration
            }
        )

Session Logging

Log session lifecycle:
async def setup(session: AgentSession):
    logger.info(f"Session started: {session.id}")
    
    agent = MyAgent()
    session.add_node(agent)
    await session.start()
    
    try:
        await session.wait_until_complete()
    finally:
        logger.info(f"Session ended: {session.id}")

Log Output Configuration

Configure log output format:
import sys
from loguru import logger

# Remove default handler
logger.remove()

# Add custom handler
logger.add(
    sys.stderr,
    format="{time:HH:mm:ss} | {level} | {message}",
    level="INFO"
)

# Add file handler for debugging
logger.add(
    "logs/agent_{time}.log",
    rotation="100 MB",
    retention="7 days",
    level="DEBUG"
)

Performance Tracking

Measure response times:
import time


class TimedAgent(OutputAgentNode):
    async def generate_response(self):
        start = time.time()
        chunk_count = 0
        
        async for chunk in self._generate():
            chunk_count += 1
            yield chunk
        
        duration = time.time() - start
        logger.info(
            f"Response generated",
            extra={
                "duration_seconds": duration,
                "chunk_count": chunk_count,
                "avg_chunk_time": duration / chunk_count if chunk_count else 0
            }
        )

Error Tracking

Capture and log errors with context:
async def generate_response(self):
    try:
        async for chunk in self._generate():
            yield chunk
            
    except Exception as e:
        logger.exception(
            "Response generation failed",
            extra={
                "error_type": type(e).__name__,
                "session_id": self.session_id,
                "message_count": len(self.context.messages)
            }
        )
        yield "I encountered an error. Let me try again."

Metrics Collection

Track key metrics for dashboards:
from prometheus_client import Counter, Histogram

# Define metrics
tool_calls = Counter("agent_tool_calls_total", "Total tool calls", ["tool_name"])
response_time = Histogram("agent_response_seconds", "Response generation time")


class MetricsAgent(OutputAgentNode):
    async def generate_response(self):
        with response_time.time():
            # ... response generation ...
            pass
    
    @function_tool()
    def get_order(self, order_id: str) -> dict:
        tool_calls.labels(tool_name="get_order").inc()
        # ... implementation ...

Debug Mode

Add a debug mode for development:
import os


DEBUG = os.getenv("DEBUG", "false").lower() == "true"


class MyAgent(OutputAgentNode):
    async def generate_response(self):
        if DEBUG:
            logger.debug(f"Context: {self.context.messages}")
        
        # ... normal generation ...
Enable with:
DEBUG=true python agent.py

Next Steps