Monitoring Guide¶
Overview¶
Effective monitoring is crucial for maintaining the health, performance, and reliability of CreativeDynamics in production. This guide covers detailed monitoring strategies, tools, and best practices for observability.
Monitoring Architecture¶
Key Metrics Categories¶
Application Metrics
Request throughput and latency
Error rates and types
API endpoint performance
Analysis job duration
Memory and CPU usage
Business Metrics
Analysis completions per hour
Data processing volume
Creative fatigue detection accuracy
User engagement patterns
Infrastructure Metrics
System resources (CPU, memory, disk, network)
Container/pod health
Service availability
Database performance
Prometheus Integration¶
Setup Prometheus Metrics¶
Add to your FastAPI application (for example, a thin wrapper module in your deployment that mounts creativedynamics.api.main:app):
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST
from fastapi import Response
import time
from functools import wraps
# Define metrics
request_count = Counter(
'creativedynamics_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'creativedynamics_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
active_analyses = Gauge(
'creativedynamics_active_analyses',
'Number of active analyses'
)
analysis_duration = Histogram(
'creativedynamics_analysis_duration_seconds',
'Analysis job duration in seconds',
['dataset', 'status']
)
error_count = Counter(
'creativedynamics_errors_total',
'Total number of errors',
['error_type', 'endpoint']
)
data_processed = Counter(
'creativedynamics_data_processed_bytes',
'Total bytes of data processed'
)
# Middleware for automatic metrics collection
async def metrics_middleware(request, call_next):
start_time = time.time()
# Process request
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
# Metrics endpoint
async def metrics_endpoint():
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
Prometheus Configuration¶
Create prometheus.yml:
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'creativedynamics'
static_configs:
- targets: ['localhost:5001']
metrics_path: '/metrics'
- job_name: 'node_exporter'
static_configs:
- targets: ['localhost:9100']
rule_files:
- 'alerts.yml'
alerting:
alertmanagers:
- static_configs:
- targets: ['localhost:9093']
Alert Rules¶
Create alerts.yml:
groups:
- name: creativedynamics_alerts
interval: 30s
rules:
# High error rate
- alert: HighErrorRate
expr: rate(creativedynamics_errors_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
# High latency
- alert: HighLatency
expr: histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High API latency"
description: "95th percentile latency is {{ $value }} seconds"
# Service down
- alert: ServiceDown
expr: up{job="creativedynamics"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "CreativeDynamics service is down"
description: "Service has been down for more than 1 minute"
# High memory usage
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is above 90%"
Grafana Dashboards¶
Main Dashboard JSON¶
Create grafana-dashboard.json:
{
"dashboard": {
"title": "CreativeDynamics Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(creativedynamics_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
],
"type": "graph"
},
{
"title": "Response Time (95th percentile)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(creativedynamics_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(creativedynamics_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
],
"type": "graph"
},
{
"title": "Active Analyses",
"targets": [
{
"expr": "creativedynamics_active_analyses"
}
],
"type": "stat"
},
{
"title": "Analysis Duration",
"targets": [
{
"expr": "histogram_quantile(0.5, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
"legendFormat": "Median"
},
{
"expr": "histogram_quantile(0.95, rate(creativedynamics_analysis_duration_seconds_bucket[1h]))",
"legendFormat": "95th percentile"
}
],
"type": "graph"
},
{
"title": "Data Processed",
"targets": [
{
"expr": "rate(creativedynamics_data_processed_bytes[5m])",
"legendFormat": "Bytes/sec"
}
],
"type": "graph"
}
]
}
}
Logging Strategy¶
Structured Logging¶
Configure structured logging in your application:
import logging
import json
from datetime import datetime
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
log_record['timestamp'] = datetime.utcnow().isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
log_record['module'] = record.module
log_record['function'] = record.funcName
log_record['line'] = record.lineno
# Add correlation ID if available
if hasattr(record, 'correlation_id'):
log_record['correlation_id'] = record.correlation_id
# Add user context if available
if hasattr(record, 'user_id'):
log_record['user_id'] = record.user_id
def setup_logging(level='INFO', log_file=None):
"""Configure structured JSON logging."""
formatter = CustomJsonFormatter()
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# File handler
handlers = [console_handler]
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
handlers.append(file_handler)
# Configure root logger
logging.basicConfig(
level=getattr(logging, level),
handlers=handlers
)
return logging.getLogger(__name__)
Log Aggregation with ELK Stack¶
Logstash Configuration¶
Create logstash.conf:
input {
file {
path => "/var/log/creativedynamics/*.log"
start_position => "beginning"
codec => "json"
}
}
filter {
if [level] == "ERROR" {
mutate {
add_tag => [ "error" ]
}
}
if [duration] {
ruby {
code => "
event.set('duration_ms', event.get('duration') * 1000)
"
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "creativedynamics-%{+YYYY.MM.dd}"
}
if "error" in [tags] {
email {
to => "alerts@example.com"
subject => "CreativeDynamics Error Alert"
body => "Error in %{module}.%{function}: %{message}"
}
}
}
Kibana Queries¶
Common queries for analysis:
// Find slow analyses
{
"query": {
"bool": {
"must": [
{ "term": { "function": "analyze_all_items" } },
{ "range": { "duration": { "gte": 60 } } }
]
}
}
}
// Error distribution
{
"aggs": {
"errors_by_type": {
"terms": {
"field": "error_type.keyword",
"size": 10
}
}
}
}
// User activity patterns
{
"aggs": {
"requests_over_time": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h"
}
}
}
}
Distributed Tracing¶
OpenTelemetry Integration¶
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
# Configure tracing
def setup_tracing():
# Set up the tracer provider
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="localhost:4317",
insecure=True
)
# Add span processor
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument FastAPI
FastAPIInstrumentor.instrument()
# Instrument requests library
RequestsInstrumentor().instrument()
return tracer
# Use in application
tracer = setup_tracing()
@app.post("/analyze")
async def analyze_endpoint(data: dict):
with tracer.start_as_current_span("analyze_request") as span:
span.set_attribute("data.size", len(str(data)))
span.set_attribute("data.items", len(data.get("items", [])))
# Process analysis
with tracer.start_as_current_span("data_validation"):
validate_data(data)
with tracer.start_as_current_span("signature_calculation"):
signatures = calculate_signatures(data)
with tracer.start_as_current_span("fatigue_detection"):
results = detect_fatigue(signatures)
return results
Health Checks and Probes¶
Detailed Health Check¶
from typing import Dict, Any
import psutil
import asyncio
from datetime import datetime
class HealthChecker:
def __init__(self):
self.checks = []
def register_check(self, name: str, check_func):
self.checks.append((name, check_func))
async def run_checks(self) -> Dict[str, Any]:
results = {
"timestamp": datetime.utcnow().isoformat(),
"status": "healthy",
"checks": {}
}
for name, check_func in self.checks:
try:
if asyncio.iscoroutinefunction(check_func):
result = await check_func()
else:
result = check_func()
results["checks"][name] = {
"status": "healthy",
"details": result
}
except Exception as e:
results["status"] = "unhealthy"
results["checks"][name] = {
"status": "unhealthy",
"error": str(e)
}
return results
# Health check functions
def check_memory():
memory = psutil.virtual_memory()
return {
"used_percent": memory.percent,
"available_gb": memory.available / (1024**3)
}
def check_disk():
disk = psutil.disk_usage('/')
return {
"used_percent": disk.percent,
"free_gb": disk.free / (1024**3)
}
async def check_api_latency():
import httpx
start = time.time()
async with httpx.AsyncClient() as client:
await client.get("http://localhost:5001/health")
return {"latency_ms": (time.time() - start) * 1000}
# Register checks
health_checker = HealthChecker()
health_checker.register_check("memory", check_memory)
health_checker.register_check("disk", check_disk)
health_checker.register_check("api_latency", check_api_latency)
# Health endpoint
@app.get("/health/detailed")
async def detailed_health():
return await health_checker.run_checks()
Performance Monitoring¶
APM with DataDog¶
from ddtrace import tracer, patch_all
from ddtrace.contrib.asgi import TraceMiddleware
# Patch all supported libraries
patch_all()
# Configure DataDog
tracer.configure(
hostname='localhost',
port=8126,
analytics_enabled=True,
env='production',
service='creativedynamics',
version='0.9.8.1'
)
# Add middleware to FastAPI
app.add_middleware(
TraceMiddleware,
tracer=tracer,
service="creativedynamics-api",
distributed_tracing=True
)
# Custom spans
@tracer.wrap(name='custom.analysis')
def perform_analysis(data):
span = tracer.current_span()
span.set_tag('data.size', len(data))
span.set_metric('items.count', len(data.get('items', [])))
# Analysis logic
return results
Alerting Configuration¶
AlertManager Setup¶
Create alertmanager.yml:
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'creativedynamics@example.com'
smtp_auth_username: 'creativedynamics@example.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'team-notifications'
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true
- match:
severity: warning
receiver: 'slack-warnings'
receivers:
- name: 'team-notifications'
email_configs:
- to: 'team@example.com'
headers:
Subject: 'CreativeDynamics Alert: {{ .GroupLabels.alertname }}'
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: 'YOUR-PAGERDUTY-KEY'
description: '{{ .CommonAnnotations.summary }}'
- name: 'slack-warnings'
slack_configs:
- api_url: 'YOUR-SLACK-WEBHOOK-URL'
channel: '#creativedynamics-alerts'
title: 'CreativeDynamics Warning'
text: '{{ .CommonAnnotations.description }}'
Monitoring Best Practices¶
1. Key Performance Indicators (KPIs)¶
Monitor these essential KPIs:
Golden Signals:
Latency: Response time distribution
Traffic: Requests per second
Errors: Error rate and types
Saturation: Resource utilization
Business Metrics:
Analysis completion rate
Average processing time
Data volume processed
API usage patterns
2. Monitoring Checklist¶
Application metrics exposed via
/metricsInfrastructure monitoring configured
Log aggregation pipeline set up
Distributed tracing enabled
Alert rules defined and tested
Dashboards created for key metrics
Runbooks documented for alerts
Regular monitoring review scheduled
3. Incident Response¶
Detection: Automated alerts trigger
Triage: Assess severity and impact
Diagnosis: Use monitoring tools to identify root cause
Resolution: Apply fix and verify
Post-mortem: Document learnings and improvements
Dashboard Examples¶
Service Overview Dashboard¶
# Metrics to display
service_metrics = {
"uptime": "process_uptime_seconds",
"request_rate": "rate(creativedynamics_requests_total[5m])",
"error_rate": "rate(creativedynamics_errors_total[5m])",
"p95_latency": "histogram_quantile(0.95, creativedynamics_request_duration_seconds_bucket)",
"active_connections": "creativedynamics_active_connections",
"memory_usage": "process_resident_memory_bytes",
"cpu_usage": "rate(process_cpu_seconds_total[5m])"
}
Analysis Performance Dashboard¶
# Analysis-specific metrics
analysis_metrics = {
"analyses_per_hour": "rate(creativedynamics_analyses_total[1h]) * 3600",
"avg_analysis_time": "rate(creativedynamics_analysis_duration_seconds_sum[1h]) / rate(creativedynamics_analysis_duration_seconds_count[1h])",
"signature_calc_time": "histogram_quantile(0.5, creativedynamics_signature_calculation_seconds_bucket)",
"wastage_detection_accuracy": "creativedynamics_wastage_detection_accuracy",
"data_processing_rate": "rate(creativedynamics_data_processed_bytes[5m])"
}
Next Steps¶
Configure Scaling for handling increased load
Review Security for monitoring security events
Set up Deployment with monitoring integration