Monitoring Guide

Overview

Effective monitoring is crucial for maintaining the health, performance, and reliability of CreativeDynamics in production. This guide covers detailed monitoring strategies, tools, and best practices for observability.

Monitoring Architecture

Key Metrics Categories

  1. Application Metrics

    • Request throughput and latency

    • Error rates and types

    • API endpoint performance

    • Analysis job duration

    • Memory and CPU usage

  2. Business Metrics

    • Analysis completions per hour

    • Data processing volume

    • Creative fatigue detection accuracy

    • User engagement patterns

  3. Infrastructure Metrics

    • System resources (CPU, memory, disk, network)

    • Container/pod health

    • Service availability

    • Database performance

Prometheus Integration

Setup Prometheus Metrics

Add to your FastAPI application (for example, a thin wrapper module in your deployment that mounts creativedynamics.api.main:app):

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
from fastapi import Response
import time
from functools import wraps

# Define metrics
request_count = Counter(
    'creativedynamics_requests_total',
    'Total number of requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'creativedynamics_request_duration_seconds',
    'Request duration in seconds',
    ['method', 'endpoint']
)

active_analyses = Gauge(
    'creativedynamics_active_analyses',
    'Number of active analyses'
)

analysis_duration = Histogram(
    'creativedynamics_analysis_duration_seconds',
    'Analysis job duration in seconds',
    ['dataset', 'status']
)

error_count = Counter(
    'creativedynamics_errors_total',
    'Total number of errors',
    ['error_type', 'endpoint']
)

data_processed = Counter(
    'creativedynamics_data_processed_bytes',
    'Total bytes of data processed'
)

# Middleware for automatic metrics collection
async def metrics_middleware(request, call_next):
    start_time = time.time()
    
    # Process request
    response = await call_next(request)
    
    # Record metrics
    duration = time.time() - start_time
    request_count.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    request_duration.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

# Metrics endpoint
async def metrics_endpoint():
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

Prometheus Configuration

Create prometheus.yml:

global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'creativedynamics'
    static_configs:
      - targets: ['localhost:5001']
    metrics_path: '/metrics'
    
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

rule_files:
  - 'alerts.yml'

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['localhost:9093']

Alert Rules

Create alerts.yml:

groups:
  - name: creativedynamics_alerts
    interval: 30s
    rules:
      # High error rate
      - alert: HighErrorRate
        expr: rate(creativedynamics_errors_total[5m]) > 0.05
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} errors per second"
      
      # High latency
      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m])) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High API latency"
          description: "95th percentile latency is {{ $value }} seconds"
      
      # Service down
      - alert: ServiceDown
        expr: up{job="creativedynamics"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "CreativeDynamics service is down"
          description: "Service has been down for more than 1 minute"
      
      # High memory usage
      - alert: HighMemoryUsage
        expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.9
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage"
          description: "Memory usage is above 90%"

Grafana Dashboards

Main Dashboard JSON

Create grafana-dashboard.json:

{
  "dashboard": {
    "title": "CreativeDynamics Monitoring",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [
          {
            "expr": "rate(creativedynamics_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Response Time (95th percentile)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m]))",
            "legendFormat": "95th percentile"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Error Rate",
        "targets": [
          {
            "expr": "rate(creativedynamics_errors_total[5m])",
            "legendFormat": "{{error_type}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Active Analyses",
        "targets": [
          {
            "expr": "creativedynamics_active_analyses"
          }
        ],
        "type": "stat"
      },
      {
        "title": "Analysis Duration",
        "targets": [
          {
            "expr": "histogram_quantile(0.5, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
            "legendFormat": "Median"
          },
          {
            "expr": "histogram_quantile(0.95, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
            "legendFormat": "95th percentile"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Data Processed",
        "targets": [
          {
            "expr": "rate(creativedynamics_data_processed_bytes[5m])",
            "legendFormat": "Bytes/sec"
          }
        ],
        "type": "graph"
      }
    ]
  }
}

Logging Strategy

Structured Logging

Configure structured logging in your application:

import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        log_record['timestamp'] = datetime.utcnow().isoformat()
        log_record['level'] = record.levelname
        log_record['logger'] = record.name
        log_record['module'] = record.module
        log_record['function'] = record.funcName
        log_record['line'] = record.lineno
        
        # Add correlation ID if available
        if hasattr(record, 'correlation_id'):
            log_record['correlation_id'] = record.correlation_id
        
        # Add user context if available
        if hasattr(record, 'user_id'):
            log_record['user_id'] = record.user_id

def setup_logging(level='INFO', log_file=None):
    """Configure structured JSON logging."""
    formatter = CustomJsonFormatter()
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    # File handler
    handlers = [console_handler]
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        handlers.append(file_handler)
    
    # Configure root logger
    logging.basicConfig(
        level=getattr(logging, level),
        handlers=handlers
    )
    
    return logging.getLogger(__name__)

Log Aggregation with ELK Stack

Logstash Configuration

Create logstash.conf:

input {
  file {
    path => "/var/log/creativedynamics/*.log"
    start_position => "beginning"
    codec => "json"
  }
}

filter {
  if [level] == "ERROR" {
    mutate {
      add_tag => [ "error" ]
    }
  }
  
  if [duration] {
    ruby {
      code => "
        event.set('duration_ms', event.get('duration') * 1000)
      "
    }
  }
  
  date {
    match => [ "timestamp", "ISO8601" ]
    target => "@timestamp"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "creativedynamics-%{+YYYY.MM.dd}"
  }
  
  if "error" in [tags] {
    email {
      to => "alerts@example.com"
      subject => "CreativeDynamics Error Alert"
      body => "Error in %{module}.%{function}: %{message}"
    }
  }
}

Kibana Queries

Common queries for analysis:

// Find slow analyses
{
  "query": {
    "bool": {
      "must": [
        { "term": { "function": "analyze_all_items" } },
        { "range": { "duration": { "gte": 60 } } }
      ]
    }
  }
}

// Error distribution
{
  "aggs": {
    "errors_by_type": {
      "terms": {
        "field": "error_type.keyword",
        "size": 10
      }
    }
  }
}

// User activity patterns
{
  "aggs": {
    "requests_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1h"
      }
    }
  }
}

Distributed Tracing

OpenTelemetry Integration

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# Configure tracing
def setup_tracing():
    # Set up the tracer provider
    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)
    
    # Configure OTLP exporter
    otlp_exporter = OTLPSpanExporter(
        endpoint="localhost:4317",
        insecure=True
    )
    
    # Add span processor
    span_processor = BatchSpanProcessor(otlp_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)
    
    # Instrument FastAPI
    FastAPIInstrumentor.instrument()
    
    # Instrument requests library
    RequestsInstrumentor().instrument()
    
    return tracer

# Use in application
tracer = setup_tracing()

@app.post("/analyze")
async def analyze_endpoint(data: dict):
    with tracer.start_as_current_span("analyze_request") as span:
        span.set_attribute("data.size", len(str(data)))
        span.set_attribute("data.items", len(data.get("items", [])))
        
        # Process analysis
        with tracer.start_as_current_span("data_validation"):
            validate_data(data)
        
        with tracer.start_as_current_span("signature_calculation"):
            signatures = calculate_signatures(data)
        
        with tracer.start_as_current_span("fatigue_detection"):
            results = detect_fatigue(signatures)
        
        return results

Health Checks and Probes

Detailed Health Check

from typing import Dict, Any
import psutil
import asyncio
from datetime import datetime

class HealthChecker:
    def __init__(self):
        self.checks = []
    
    def register_check(self, name: str, check_func):
        self.checks.append((name, check_func))
    
    async def run_checks(self) -> Dict[str, Any]:
        results = {
            "timestamp": datetime.utcnow().isoformat(),
            "status": "healthy",
            "checks": {}
        }
        
        for name, check_func in self.checks:
            try:
                if asyncio.iscoroutinefunction(check_func):
                    result = await check_func()
                else:
                    result = check_func()
                results["checks"][name] = {
                    "status": "healthy",
                    "details": result
                }
            except Exception as e:
                results["status"] = "unhealthy"
                results["checks"][name] = {
                    "status": "unhealthy",
                    "error": str(e)
                }
        
        return results

# Health check functions
def check_memory():
    memory = psutil.virtual_memory()
    return {
        "used_percent": memory.percent,
        "available_gb": memory.available / (1024**3)
    }

def check_disk():
    disk = psutil.disk_usage('/')
    return {
        "used_percent": disk.percent,
        "free_gb": disk.free / (1024**3)
    }

async def check_api_latency():
    import httpx
    start = time.time()
    async with httpx.AsyncClient() as client:
        await client.get("http://localhost:5001/health")
    return {"latency_ms": (time.time() - start) * 1000}

# Register checks
health_checker = HealthChecker()
health_checker.register_check("memory", check_memory)
health_checker.register_check("disk", check_disk)
health_checker.register_check("api_latency", check_api_latency)

# Health endpoint
@app.get("/health/detailed")
async def detailed_health():
    return await health_checker.run_checks()

Performance Monitoring

APM with DataDog

from ddtrace import tracer, patch_all
from ddtrace.contrib.asgi import TraceMiddleware

# Patch all supported libraries
patch_all()

# Configure DataDog
tracer.configure(
    hostname='localhost',
    port=8126,
    analytics_enabled=True,
    env='production',
    service='creativedynamics',
    version='0.9.8.1'
)

# Add middleware to FastAPI
app.add_middleware(
    TraceMiddleware,
    tracer=tracer,
    service="creativedynamics-api",
    distributed_tracing=True
)

# Custom spans
@tracer.wrap(name='custom.analysis')
def perform_analysis(data):
    span = tracer.current_span()
    span.set_tag('data.size', len(data))
    span.set_metric('items.count', len(data.get('items', [])))
    
    # Analysis logic
    return results

Alerting Configuration

AlertManager Setup

Create alertmanager.yml:

global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'creativedynamics@example.com'
  smtp_auth_username: 'creativedynamics@example.com'
  smtp_auth_password: 'password'

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'team-notifications'
  
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty-critical'
      continue: true
    
    - match:
        severity: warning
      receiver: 'slack-warnings'

receivers:
  - name: 'team-notifications'
    email_configs:
      - to: 'team@example.com'
        headers:
          Subject: 'CreativeDynamics Alert: {{ .GroupLabels.alertname }}'
  
  - name: 'pagerduty-critical'
    pagerduty_configs:
      - service_key: 'YOUR-PAGERDUTY-KEY'
        description: '{{ .CommonAnnotations.summary }}'
  
  - name: 'slack-warnings'
    slack_configs:
      - api_url: 'YOUR-SLACK-WEBHOOK-URL'
        channel: '#creativedynamics-alerts'
        title: 'CreativeDynamics Warning'
        text: '{{ .CommonAnnotations.description }}'

Monitoring Best Practices

1. Key Performance Indicators (KPIs)

Monitor these essential KPIs:

  • Golden Signals:

    • Latency: Response time distribution

    • Traffic: Requests per second

    • Errors: Error rate and types

    • Saturation: Resource utilization

  • Business Metrics:

    • Analysis completion rate

    • Average processing time

    • Data volume processed

    • API usage patterns

2. Monitoring Checklist

  • Application metrics exposed via /metrics

  • Infrastructure monitoring configured

  • Log aggregation pipeline set up

  • Distributed tracing enabled

  • Alert rules defined and tested

  • Dashboards created for key metrics

  • Runbooks documented for alerts

  • Regular monitoring review scheduled

3. Incident Response

  1. Detection: Automated alerts trigger

  2. Triage: Assess severity and impact

  3. Diagnosis: Use monitoring tools to identify root cause

  4. Resolution: Apply fix and verify

  5. Post-mortem: Document learnings and improvements

Dashboard Examples

Service Overview Dashboard

# Metrics to display
service_metrics = {
    "uptime": "process_uptime_seconds",
    "request_rate": "rate(creativedynamics_requests_total[5m])",
    "error_rate": "rate(creativedynamics_errors_total[5m])",
    "p95_latency": "histogram_quantile(0.95, creativedynamics_request_duration_seconds_bucket)",
    "active_connections": "creativedynamics_active_connections",
    "memory_usage": "process_resident_memory_bytes",
    "cpu_usage": "rate(process_cpu_seconds_total[5m])"
}

Analysis Performance Dashboard

# Analysis-specific metrics
analysis_metrics = {
    "analyses_per_hour": "rate(creativedynamics_analyses_total[1h]) * 3600",
    "avg_analysis_time": "rate(creativedynamics_analysis_duration_seconds_sum[1h]) / rate(creativedynamics_analysis_duration_seconds_count[1h])",
    "signature_calc_time": "histogram_quantile(0.5, creativedynamics_signature_calculation_seconds_bucket)",
    "wastage_detection_accuracy": "creativedynamics_wastage_detection_accuracy",
    "data_processing_rate": "rate(creativedynamics_data_processed_bytes[5m])"
}

Next Steps

  • Configure Scaling for handling increased load

  • Review Security for monitoring security events

  • Set up Deployment with monitoring integration