Security Best Practices

Overview

This guide provides detailed security guidelines for deploying and maintaining CreativeDynamics in production environments. It covers authentication, authorisation, data protection, vulnerability management, and compliance considerations.

CreativeDynamics provides the core analysis and reporting pipeline. Authentication, authorisation, rate limiting, persistence, and security monitoring are typically implemented in a thin deployment application that wraps creativedynamics.api.main:app.

Security Architecture

Defense in Depth Strategy

┌─────────────────────────────────────┐
│         External Firewall           │
├─────────────────────────────────────┤
│          WAF / DDoS Protection      │
├─────────────────────────────────────┤
│         Load Balancer / Nginx       │
├─────────────────────────────────────┤
│      API Gateway (Rate Limiting)    │
├─────────────────────────────────────┤
│    Application Layer (FastAPI)      │
├─────────────────────────────────────┤
│      Data Layer (Encrypted)         │
└─────────────────────────────────────┘

Authentication & Authorisation

JWT Token Implementation

# your_service/auth/jwt_handler.py
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
import os

# Security configuration
SECRET_KEY = os.getenv("SECRET_KEY", "")  # Must be set in production
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class TokenData(BaseModel):
    username: Optional[str] = None
    scopes: list = []

def create_access_token(
    data: Dict[str, Any],
    expires_delta: Optional[timedelta] = None
) -> str:
    """Create JWT access token."""
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    
    to_encode.update({
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "access"
    })
    
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str) -> TokenData:
    """Verify and decode JWT token."""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        scopes: list = payload.get("scopes", [])
        
        if username is None:
            raise JWTError("Invalid token")
        
        return TokenData(username=username, scopes=scopes)
    
    except JWTError:
        raise ValueError("Could not validate credentials")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash."""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """Hash password using bcrypt."""
    return pwd_context.hash(password)

OAuth2 Integration

# your_service/auth/oauth2.py
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from typing import Optional

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={
        "read": "Read access",
        "write": "Write access",
        "admin": "Admin access"
    }
)

async def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme)
):
    """Get current authenticated user."""
    
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    
    try:
        token_data = verify_token(token)
    except ValueError:
        raise credentials_exception
    
    # Check scopes
    for scope in security_scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    
    return token_data

# Protected endpoint example
@app.get("/api/admin/users")
async def get_users(
    current_user: TokenData = Security(get_current_user, scopes=["admin"])
):
    """Admin-only endpoint."""
    return {"users": get_all_users()}

API Key Authentication

# your_service/auth/api_key.py
from fastapi import HTTPException, Security, status
from fastapi.security import APIKeyHeader, APIKeyQuery
import hashlib
import hmac
import time

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)

class APIKeyValidator:
    """Validate API keys with rate limiting."""
    
    def __init__(self):
        self.valid_keys = {}  # Load from database
        self.rate_limits = {}
    
    async def validate_api_key(
        self,
        api_key_header: str = Security(api_key_header),
        api_key_query: str = Security(api_key_query)
    ) -> str:
        """Validate API key from header or query."""
        
        api_key = api_key_header or api_key_query
        
        if not api_key:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="API key required"
            )
        
        # Validate key format
        if not self._is_valid_format(api_key):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid API key format"
            )
        
        # Check if key exists and is active
        key_data = await self._get_key_data(api_key)
        if not key_data or not key_data.get("active"):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid or inactive API key"
            )
        
        # Check rate limits
        if not await self._check_rate_limit(api_key):
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail="Rate limit exceeded"
            )
        
        return api_key
    
    def _is_valid_format(self, api_key: str) -> bool:
        """Check if API key has valid format."""
        # Format: prefix_randomstring (e.g., cd_1234567890abcdef)
        return api_key.startswith("cd_") and len(api_key) == 19
    
    async def _get_key_data(self, api_key: str) -> dict:
        """Retrieve API key data from database."""
        # Hash the API key before database lookup
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        # Query database for key_hash
        return await db.get_api_key(key_hash)
    
    async def _check_rate_limit(self, api_key: str) -> bool:
        """Check if API key has exceeded rate limit."""
        current_time = time.time()
        window = 60  # 1 minute window
        max_requests = 100  # 100 requests per minute
        
        if api_key not in self.rate_limits:
            self.rate_limits[api_key] = []
        
        # Remove old requests outside window
        self.rate_limits[api_key] = [
            t for t in self.rate_limits[api_key]
            if current_time - t < window
        ]
        
        # Check if limit exceeded
        if len(self.rate_limits[api_key]) >= max_requests:
            return False
        
        # Add current request
        self.rate_limits[api_key].append(current_time)
        return True

api_key_validator = APIKeyValidator()

Input Validation & Sanitisation

Request Validation

# your_service/security/validation.py
from pydantic import BaseModel, validator, Field
from typing import List, Optional, Dict, Any
import re
import bleach

class SecureAnalysisRequest(BaseModel):
    """Validated analysis request with security checks."""
    
    dataset_name: str = Field(..., min_length=1, max_length=100)
    items: List[Dict[str, Any]] = Field(..., min_items=1, max_items=10000)
    config: Optional[Dict[str, Any]] = Field(default_factory=dict)
    
    @validator('dataset_name')
    def validate_dataset_name(cls, v):
        """Validate dataset name for security."""
        # Remove any HTML/script tags
        v = bleach.clean(v, tags=[], strip=True)
        
        # Check for path traversal attempts
        if any(char in v for char in ['..', '/', '\\', '\x00']):
            raise ValueError("Invalid characters in dataset name")
        
        # Alphanumeric, spaces, hyphens, underscores only
        if not re.match(r'^[a-zA-Z0-9\s\-_]+$', v):
            raise ValueError("Dataset name contains invalid characters")
        
        return v
    
    @validator('items')
    def validate_items(cls, v):
        """Validate items for security and size."""
        for item in v:
            # Check item size
            item_size = len(str(item))
            if item_size > 100000:  # 100KB per item
                raise ValueError("Item size exceeds maximum allowed")
            
            # Validate item structure
            if 'id' not in item:
                raise ValueError("Item missing required 'id' field")
            
            # Sanitize string fields
            for key, value in item.items():
                if isinstance(value, str):
                    item[key] = bleach.clean(value, tags=[], strip=True)
        
        return v
    
    @validator('config')
    def validate_config(cls, v):
        """Validate configuration for security."""
        # Whitelist allowed configuration keys
        allowed_keys = [
            'signature_depth', 'window_size', 'threshold',
            'min_samples', 'output_format', 'include_plots'
        ]
        
        # Remove any non-whitelisted keys
        v = {k: v[k] for k in v if k in allowed_keys}
        
        # Validate value types and ranges
        if 'signature_depth' in v:
            if not isinstance(v['signature_depth'], int) or not 1 <= v['signature_depth'] <= 10:
                raise ValueError("Invalid signature_depth value")
        
        if 'window_size' in v:
            if not isinstance(v['window_size'], int) or not 1 <= v['window_size'] <= 1000:
                raise ValueError("Invalid window_size value")
        
        return v

# File upload validation
class SecureFileUpload:
    """Secure file upload handling."""
    
    ALLOWED_EXTENSIONS = {'.csv', '.json', '.xlsx'}
    MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB
    
    @staticmethod
    async def validate_file(file):
        """Validate uploaded file for security."""
        
        # Check file extension
        filename = file.filename.lower()
        extension = os.path.splitext(filename)[1]
        if extension not in SecureFileUpload.ALLOWED_EXTENSIONS:
            raise ValueError(f"File type {extension} not allowed")
        
        # Check file size
        contents = await file.read()
        if len(contents) > SecureFileUpload.MAX_FILE_SIZE:
            raise ValueError("File size exceeds maximum allowed")
        
        # Reset file pointer
        await file.seek(0)
        
        # Validate file content matches extension
        if extension == '.csv':
            try:
                # Try to parse as CSV
                pd.read_csv(io.StringIO(contents.decode('utf-8')), nrows=1)
            except:
                raise ValueError("Invalid CSV file")
        
        elif extension == '.json':
            try:
                # Try to parse as JSON
                json.loads(contents)
            except:
                raise ValueError("Invalid JSON file")
        
        return True

SQL Injection Prevention

# your_service/security/database.py
from sqlalchemy import text
from sqlalchemy.orm import Session
import re

class SecureDatabase:
    """Secure database operations."""
    
    @staticmethod
    def execute_query(db: Session, query: str, params: dict = None):
        """Execute query with parameterized inputs."""
        
        # Never use string formatting for queries
        # Always use parameterized queries
        
        # Bad (vulnerable):
        # query = f"SELECT * FROM users WHERE id = {user_id}"
        
        # Good (secure):
        safe_query = text(query)
        result = db.execute(safe_query, params or {})
        
        return result
    
    @staticmethod
    def validate_table_name(table_name: str) -> str:
        """Validate table name to prevent injection."""
        
        # Whitelist allowed table names
        allowed_tables = ['analyses', 'results', 'users', 'api_keys']
        
        if table_name not in allowed_tables:
            raise ValueError(f"Invalid table name: {table_name}")
        
        # Additional validation
        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', table_name):
            raise ValueError("Invalid table name format")
        
        return table_name

Data Protection

Encryption at Rest

# your_service/security/encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
import os

class DataEncryption:
    """Encrypt sensitive data at rest."""
    
    def __init__(self):
        self.key = self._derive_key()
        self.cipher = Fernet(self.key)
    
    def _derive_key(self) -> bytes:
        """Derive encryption key from master key."""
        
        master_key = os.getenv("MASTER_ENCRYPTION_KEY", "").encode()
        salt = os.getenv("ENCRYPTION_SALT", "").encode()
        
        if not master_key or not salt:
            raise ValueError("Encryption keys not configured")
        
        kdf = PBKDF2(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        
        key = base64.urlsafe_b64encode(kdf.derive(master_key))
        return key
    
    def encrypt(self, data: str) -> str:
        """Encrypt string data."""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt(self, encrypted_data: str) -> str:
        """Decrypt string data."""
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    def encrypt_file(self, file_path: str, output_path: str):
        """Encrypt file contents."""
        with open(file_path, 'rb') as f:
            data = f.read()
        
        encrypted = self.cipher.encrypt(data)
        
        with open(output_path, 'wb') as f:
            f.write(encrypted)
    
    def decrypt_file(self, encrypted_path: str, output_path: str):
        """Decrypt file contents."""
        with open(encrypted_path, 'rb') as f:
            encrypted = f.read()
        
        decrypted = self.cipher.decrypt(encrypted)
        
        with open(output_path, 'wb') as f:
            f.write(decrypted)

# Usage
encryption = DataEncryption()

# Encrypt sensitive configuration
encrypted_config = encryption.encrypt(json.dumps(sensitive_config))

# Decrypt when needed
decrypted_config = json.loads(encryption.decrypt(encrypted_config))

Secure Communication (TLS)

# your_service/security/tls.py
import ssl
import certifi
from fastapi import FastAPI
import uvicorn

def create_ssl_context() -> ssl.SSLContext:
    """Create secure SSL context."""
    
    # Create SSL context with secure defaults
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    
    # Load certificates
    context.load_cert_chain(
        certfile="/path/to/cert.pem",
        keyfile="/path/to/key.pem"
    )
    
    # Set minimum TLS version to 1.2
    context.minimum_version = ssl.TLSVersion.TLSv1_2
    
    # Disable weak ciphers
    context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
    
    # Enable hostname checking
    context.check_hostname = True
    context.verify_mode = ssl.CERT_REQUIRED
    
    return context

# Run API with TLS
if __name__ == "__main__":
    ssl_context = create_ssl_context()
    
    uvicorn.run(
        "creativedynamics.api.main:app",
        host="0.0.0.0",
        port=443,
        ssl_keyfile="/path/to/key.pem",
        ssl_certfile="/path/to/cert.pem",
        ssl_version=ssl.PROTOCOL_TLS,
        ssl_cert_reqs=ssl.CERT_REQUIRED,
        ssl_ciphers="TLSv1.2:!aNULL:!eNULL:!EXPORT:!DES:!MD5:!PSK:!RC4"
    )

Security Headers

FastAPI Security Middleware

# your_service/security/headers.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """Add security headers to all responses."""
    
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # Security headers
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:; "
            "font-src 'self' data:; "
            "connect-src 'self';"
        )
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = (
            "accelerometer=(), camera=(), geolocation=(), "
            "gyroscope=(), magnetometer=(), microphone=(), "
            "payment=(), usb=()"
        )
        
        return response

def configure_security(app: FastAPI):
    """Configure all security middleware."""
    
    # Security headers
    app.add_middleware(SecurityHeadersMiddleware)
    
    # CORS configuration
    app.add_middleware(
        CORSMiddleware,
        allow_origins=os.getenv("CORS_ORIGINS", "").split(","),
        allow_credentials=True,
        allow_methods=["GET", "POST"],
        allow_headers=["*"],
        max_age=86400,
    )
    
    # Trusted host validation
    app.add_middleware(
        TrustedHostMiddleware,
        allowed_hosts=os.getenv("ALLOWED_HOSTS", "localhost").split(",")
    )
    
    return app

Rate Limiting

Implementation with Redis

# your_service/security/rate_limiting.py
from fastapi import HTTPException, Request, status
from typing import Optional
import redis
import time
import hashlib

class RateLimiter:
    """Token bucket rate limiting with Redis."""
    
    def __init__(
        self,
        redis_client: redis.Redis,
        requests_per_minute: int = 60,
        requests_per_hour: int = 1000
    ):
        self.redis = redis_client
        self.rpm = requests_per_minute
        self.rph = requests_per_hour
    
    async def check_rate_limit(
        self,
        request: Request,
        identifier: Optional[str] = None
    ) -> bool:
        """Check if request exceeds rate limit."""
        
        # Get identifier (API key, user ID, or IP)
        if not identifier:
            identifier = self._get_client_ip(request)
        
        # Create rate limit keys
        minute_key = f"rate_limit:minute:{identifier}:{int(time.time() / 60)}"
        hour_key = f"rate_limit:hour:{identifier}:{int(time.time() / 3600)}"
        
        # Check minute limit
        minute_count = self.redis.incr(minute_key)
        if minute_count == 1:
            self.redis.expire(minute_key, 60)
        
        if minute_count > self.rpm:
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail=f"Rate limit exceeded: {self.rpm} requests per minute",
                headers={"Retry-After": "60"}
            )
        
        # Check hour limit
        hour_count = self.redis.incr(hour_key)
        if hour_count == 1:
            self.redis.expire(hour_key, 3600)
        
        if hour_count > self.rph:
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail=f"Rate limit exceeded: {self.rph} requests per hour",
                headers={"Retry-After": "3600"}
            )
        
        return True
    
    def _get_client_ip(self, request: Request) -> str:
        """Get client IP address."""
        
        # Check for proxy headers
        forwarded = request.headers.get("X-Forwarded-For")
        if forwarded:
            return forwarded.split(",")[0].strip()
        
        real_ip = request.headers.get("X-Real-IP")
        if real_ip:
            return real_ip
        
        return request.client.host

# Usage in FastAPI
rate_limiter = RateLimiter(redis_client)

@app.post("/api/v1/analyze/all")
async def analyze(
    request: Request,
    data: SecureAnalysisRequest,
    api_key: str = Depends(api_key_validator.validate_api_key)
):
    # Check rate limit
    await rate_limiter.check_rate_limit(request, identifier=api_key)
    
    # Process request
    return await process_analysis(data)

Audit Logging

Security Event Logging

# your_service/security/audit.py
from datetime import datetime
from typing import Dict, Any, Optional
import json
import hashlib

class AuditLogger:
    """Log security-relevant events."""
    
    def __init__(self, log_file: str = "/var/log/creativedynamics/audit.log"):
        self.log_file = log_file
    
    async def log_event(
        self,
        event_type: str,
        user_id: Optional[str] = None,
        ip_address: Optional[str] = None,
        details: Dict[str, Any] = None,
        severity: str = "INFO"
    ):
        """Log security event."""
        
        event = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "severity": severity,
            "user_id": user_id,
            "ip_address": ip_address,
            "details": details or {},
            "event_id": self._generate_event_id()
        }
        
        # Log to file
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(event) + '\n')
        
        # Also send to SIEM if configured
        await self._send_to_siem(event)
        
        # Alert on critical events
        if severity == "CRITICAL":
            await self._send_alert(event)
    
    def _generate_event_id(self) -> str:
        """Generate unique event ID."""
        timestamp = str(datetime.utcnow().timestamp())
        random_data = os.urandom(16).hex()
        return hashlib.sha256(f"{timestamp}{random_data}".encode()).hexdigest()[:16]
    
    async def _send_to_siem(self, event: dict):
        """Send event to SIEM system."""
        # Implement SIEM integration
        pass
    
    async def _send_alert(self, event: dict):
        """Send critical security alerts."""
        # Implement alerting (email, Slack, PagerDuty, etc.)
        pass

# Audit log events
audit = AuditLogger()

# Log authentication attempts
await audit.log_event(
    event_type="AUTH_ATTEMPT",
    user_id=username,
    ip_address=client_ip,
    details={"success": False, "reason": "Invalid password"},
    severity="WARNING"
)

# Log data access
await audit.log_event(
    event_type="DATA_ACCESS",
    user_id=current_user.id,
    details={"dataset": dataset_name, "records": record_count},
    severity="INFO"
)

# Log security violations
await audit.log_event(
    event_type="SECURITY_VIOLATION",
    ip_address=client_ip,
    details={"violation": "SQL injection attempt", "query": malicious_query},
    severity="CRITICAL"
)

Vulnerability Management

Dependency Scanning

# .github/workflows/security.yml
name: Security Scan

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 0 * * *'  # Daily scan

jobs:
  security:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    
    - name: Install dependencies
      run: |
        pip install safety bandit semgrep
    
    - name: Run Safety check
      run: |
        safety check --json --output safety-report.json
    
    - name: Run Bandit security linter
      run: |
        bandit -r creativedynamics/ -f json -o bandit-report.json
    
    - name: Run Semgrep
      run: |
        semgrep --config=auto --json --output=semgrep-report.json creativedynamics/
    
    - name: Upload security reports
      uses: actions/upload-artifact@v3
      with:
        name: security-reports
        path: |
          safety-report.json
          bandit-report.json
          semgrep-report.json
    
    - name: Check for vulnerabilities
      run: |
        python scripts/check_vulnerabilities.py

Security Testing

# tests/test_security.py
import pytest
from fastapi.testclient import TestClient
import time

class TestSecurity:
    """Security-focused tests."""
    
    def test_sql_injection(self, client: TestClient):
        """Test SQL injection prevention."""
        
        malicious_inputs = [
            "'; DROP TABLE users; --",
            "1' OR '1'='1",
            "admin'--",
            "' UNION SELECT * FROM users--"
        ]
        
        for payload in malicious_inputs:
            response = client.post(
                "/api/v1/analyze/all",
                json={"dataset_name": payload, "items": []}
            )
            
            # Should reject malicious input
            assert response.status_code in [400, 422]
            assert "DROP TABLE" not in str(response.content)
    
    def test_xss_prevention(self, client: TestClient):
        """Test XSS prevention."""
        
        xss_payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')",
            "<iframe src='javascript:alert()'>"
        ]
        
        for payload in xss_payloads:
            response = client.post(
                "/api/analyze",
                json={"dataset_name": payload, "items": []}
            )
            
            # Should sanitize or reject
            assert "<script>" not in str(response.content)
            assert "javascript:" not in str(response.content)
    
    def test_rate_limiting(self, client: TestClient):
        """Test rate limiting."""
        
        # Make requests up to limit
        for i in range(60):
            response = client.get("/health")
            assert response.status_code == 200
        
        # Next request should be rate limited
        response = client.get("/health")
        assert response.status_code == 429
        assert "Retry-After" in response.headers
    
    def test_authentication_required(self, client: TestClient):
        """Test authentication enforcement."""
        
        protected_endpoints = [
            "/api/v1/report/export",
        ]
        
        for endpoint in protected_endpoints:
            response = client.get(endpoint)
            assert response.status_code == 401
    
    def test_secure_headers(self, client: TestClient):
        """Test security headers."""
        
        response = client.get("/health")
        
        assert response.headers.get("X-Content-Type-Options") == "nosniff"
        assert response.headers.get("X-Frame-Options") == "DENY"
        assert response.headers.get("X-XSS-Protection") == "1; mode=block"
        assert "Strict-Transport-Security" in response.headers
        assert "Content-Security-Policy" in response.headers

Compliance & Privacy

GDPR Compliance

# your_service/privacy/gdpr.py
from datetime import datetime, timedelta
from typing import Dict, Any
import json

class GDPRCompliance:
    """GDPR compliance utilities."""
    
    async def export_user_data(self, user_id: str) -> Dict[str, Any]:
        """Export all user data (GDPR Article 20)."""
        
        user_data = {
            "user_id": user_id,
            "export_date": datetime.utcnow().isoformat(),
            "personal_data": await self._get_personal_data(user_id),
            "usage_data": await self._get_usage_data(user_id),
            "analysis_history": await self._get_analysis_history(user_id)
        }
        
        # Log data export
        await audit.log_event(
            event_type="GDPR_DATA_EXPORT",
            user_id=user_id,
            severity="INFO"
        )
        
        return user_data
    
    async def delete_user_data(self, user_id: str) -> bool:
        """Delete all user data (GDPR Article 17)."""
        
        # Delete from all systems
        await self._delete_from_database(user_id)
        await self._delete_from_cache(user_id)
        await self._delete_from_logs(user_id)
        
        # Log deletion
        await audit.log_event(
            event_type="GDPR_DATA_DELETION",
            user_id=user_id,
            severity="INFO"
        )
        
        return True
    
    async def anonymize_old_data(self, days: int = 365):
        """Anonymize data older than specified days."""
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        
        # Anonymize personal data
        await db.execute("""
            UPDATE analyses 
            SET user_id = 'ANONYMIZED', 
                ip_address = 'ANONYMIZED'
            WHERE created_at < :cutoff_date
        """, {"cutoff_date": cutoff_date})
        
        return True

Security Checklist

Development Phase

  • Implement authentication and authorization

  • Add input validation for all endpoints

  • Enable security headers

  • Configure CORS properly

  • Implement rate limiting

  • Add audit logging

  • Set up dependency scanning

  • Write security tests

Deployment Phase

  • Generate strong secret keys

  • Configure TLS/SSL certificates

  • Set up firewall rules

  • Configure secure database connections

  • Enable encryption at rest

  • Set up intrusion detection

  • Configure backup encryption

  • Implement key rotation

Operational Phase

  • Regular security updates

  • Vulnerability scanning

  • Penetration testing

  • Security audit reviews

  • Incident response plan

  • Security training for team

  • Compliance audits

  • Regular backups and recovery testing

Incident Response

Response Plan

  1. Detection: Monitor security events and alerts

  2. Containment: Isolate affected systems

  3. Investigation: Analyze logs and determine scope

  4. Eradication: Remove threat and patch vulnerabilities

  5. Recovery: Restore systems and verify integrity

  6. Lessons Learned: Document and improve processes

Emergency Contacts

# your_service/security/incident.py
SECURITY_CONTACTS = {
    "security_team": "security@example.com",
    "ciso": "ciso@example.com",
    "legal": "legal@example.com",
    "pr": "pr@example.com",
    "on_call": "+1-xxx-xxx-xxxx"
}

# Incident severity levels
SEVERITY_LEVELS = {
    "CRITICAL": "Data breach, system compromise",
    "HIGH": "Authentication bypass, privilege escalation",
    "MEDIUM": "Failed attack attempts, suspicious activity",
    "LOW": "Policy violations, minor issues"
}

Next Steps