Scaling Guide

Overview

This guide provides detailed strategies for scaling CreativeDynamics to handle increased load, larger datasets, and concurrent users. It covers both vertical and horizontal scaling approaches, optimisation techniques, and architectural patterns for high-performance deployments.

Scaling Dimensions

1. Request Volume Scaling

  • Current Capacity: ~1000 requests/minute single instance

  • Target Capacity: 10,000+ requests/minute with horizontal scaling

  • Strategy: Load balancing across multiple API instances

2. Data Volume Scaling

  • Current Limit: ~100MB per analysis

  • Target Support: 1GB+ datasets

  • Strategy: Streaming processing and chunked analysis

3. Concurrent Analysis Scaling

  • Current: Sequential processing

  • Target: Parallel processing with job queues

  • Strategy: Distributed task processing with Celery/RabbitMQ

Horizontal Scaling Architecture

Multi-Instance Deployment

# docker-compose-scaled.yml
version: '3.8'

services:
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - api1
      - api2
      - api3

  api1:
    build: .
    environment:
      - INSTANCE_ID=1
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
    depends_on:
      - redis
      - postgres

  api2:
    build: .
    environment:
      - INSTANCE_ID=2
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
    depends_on:
      - redis
      - postgres

  api3:
    build: .
    environment:
      - INSTANCE_ID=3
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
    depends_on:
      - redis
      - postgres

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=creativedynamics
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data

  worker:
    build: .
    command: celery -A your_service.tasks worker --loglevel=info
    environment:
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://user:pass@postgres:5432/creativedynamics
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3

volumes:
  redis_data:
  postgres_data:

Nginx Load Balancer Configuration

# nginx.conf
upstream creativedynamics_api {
    least_conn;  # Use least connections load balancing
    server api1:5001 max_fails=3 fail_timeout=30s;
    server api2:5001 max_fails=3 fail_timeout=30s;
    server api3:5001 max_fails=3 fail_timeout=30s;
    
    # Health check
    keepalive 32;
}

server {
    listen 80;
    server_name _;
    
    # API endpoints
    location / {
        proxy_pass http://creativedynamics_api;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # Connection pooling
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
        
        # Buffering
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
        proxy_busy_buffers_size 8k;
    }
    
    # Health check endpoint
    location /health {
        access_log off;
        proxy_pass http://creativedynamics_api/health;
    }
}

Distributed Task Processing

Celery Configuration

# your_service/tasks/celery_app.py
from celery import Celery
from kombu import Queue
import os

# Create Celery instance
app = Celery('creativedynamics')

# Configuration
app.conf.update(
    broker_url=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
    result_backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
    
    # Task execution settings
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    
    # Performance tuning
    worker_prefetch_multiplier=4,
    worker_max_tasks_per_child=1000,
    task_acks_late=True,
    
    # Task routing
    task_routes={
        'your_service.tasks.analysis.*': {'queue': 'analysis'},
        'your_service.tasks.signature.*': {'queue': 'signature'},
        'your_service.tasks.report.*': {'queue': 'report'},
    },
    
    # Queue configuration
    task_queues=(
        Queue('analysis', routing_key='analysis.#', priority=10),
        Queue('signature', routing_key='signature.#', priority=5),
        Queue('report', routing_key='report.#', priority=1),
    ),
    
    # Result expiration
    result_expires=3600,  # 1 hour
    
    # Rate limiting
    task_annotations={
        'your_service.tasks.analysis.heavy_analysis': {
            'rate_limit': '10/m',  # 10 per minute
        },
    },
)

Async Task Implementation

# your_service/tasks/analysis.py
from celery import Task
from typing import Dict, Any
import numpy as np
from creativedynamics.core.analyzer import analyze_all_items
from your_service.tasks.celery_app import app

class AnalysisTask(Task):
    """Base task with automatic retries and error handling."""
    
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3, 'countdown': 5}
    track_started = True
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Handle task failure."""
        logger.error(f"Task {task_id} failed: {exc}")
        # Send alert notification
        send_alert(f"Analysis task failed: {task_id}")
    
    def on_success(self, retval, task_id, args, kwargs):
        """Handle task success."""
        logger.info(f"Task {task_id} completed successfully")
        # Update metrics
        task_completion_counter.inc()

@app.task(base=AnalysisTask, name='your_service.tasks.analysis.analyze')
def analyze_async(
    data: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """Asynchronous analysis task."""
    
    # Update task state
    analyze_async.update_state(
        state='PROCESSING',
        meta={'current': 0, 'total': len(data.get('items', []))}
    )
    
    # Perform analysis
    results = analyze_all_items(data, config)
    
    # Cache results
    cache_key = f"analysis:{task_id}"
    redis_client.setex(cache_key, 3600, json.dumps(results))
    
    return results

@app.task(name='your_service.tasks.analysis.batch_analyze')
def batch_analyze(
    batch_data: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[str]:
    """Process multiple analyses in parallel."""
    
    # Create subtasks
    job = group(
        analyze_async.s(data, config) 
        for data in batch_data
    )
    
    # Execute in parallel
    result = job.apply_async()
    
    return [r.id for r in result.results]

Database Scaling

Connection Pooling

# your_service/db/connection.py
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
import os

def create_db_engine():
    """Create database engine with connection pooling."""
    
    database_url = os.getenv('DATABASE_URL')
    
    engine = create_engine(
        database_url,
        # Connection pool settings
        poolclass=QueuePool,
        pool_size=20,                # Number of persistent connections
        max_overflow=10,              # Maximum overflow connections
        pool_timeout=30,              # Timeout for getting connection
        pool_recycle=3600,           # Recycle connections after 1 hour
        pool_pre_ping=True,          # Test connections before using
        
        # Performance settings
        echo=False,                  # Disable SQL logging in production
        connect_args={
            "connect_timeout": 10,
            "application_name": "creativedynamics",
            "options": "-c statement_timeout=30000"  # 30 second timeout
        }
    )
    
    return engine

# Global connection pool
engine = create_db_engine()

Read Replica Configuration

# your_service/db/replicas.py
from sqlalchemy import create_engine
from random import choice

class DatabaseRouter:
    """Route database queries to appropriate instances."""
    
    def __init__(self):
        self.master = create_engine(os.getenv('DATABASE_MASTER_URL'))
        self.replicas = [
            create_engine(os.getenv(f'DATABASE_REPLICA_{i}_URL'))
            for i in range(1, 4)
        ]
    
    def get_read_engine(self):
        """Get a read replica connection."""
        return choice(self.replicas)
    
    def get_write_engine(self):
        """Get master connection for writes."""
        return self.master
    
    def execute_read(self, query):
        """Execute read query on replica."""
        engine = self.get_read_engine()
        with engine.connect() as conn:
            return conn.execute(query)
    
    def execute_write(self, query):
        """Execute write query on master."""
        with self.master.connect() as conn:
            return conn.execute(query)

# Global router
db_router = DatabaseRouter()

Caching Strategy

Multi-Layer Caching

# your_service/cache/manager.py
import redis
from functools import wraps
import hashlib
import json
import pickle
from typing import Any, Optional
import asyncio

class CacheManager:
    """Multi-layer caching with Redis and local memory."""
    
    def __init__(self):
        # Redis connection
        self.redis = redis.Redis(
            host=os.getenv('REDIS_HOST', 'localhost'),
            port=int(os.getenv('REDIS_PORT', 6379)),
            db=0,
            decode_responses=False,
            connection_pool_kwargs={
                'max_connections': 50,
                'socket_keepalive': True,
                'socket_keepalive_options': {
                    1: 1,  # TCP_KEEPIDLE
                    2: 3,  # TCP_KEEPINTVL  
                    3: 5,  # TCP_KEEPCNT
                }
            }
        )
        
        # Local memory cache (LRU)
        self.local_cache = {}
        self.max_local_size = 1000
    
    def _generate_key(self, prefix: str, params: dict) -> str:
        """Generate cache key from parameters."""
        param_str = json.dumps(params, sort_keys=True)
        hash_val = hashlib.md5(param_str.encode()).hexdigest()
        return f"{prefix}:{hash_val}"
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache (local first, then Redis)."""
        
        # Check local cache
        if key in self.local_cache:
            return self.local_cache[key]
        
        # Check Redis
        value = self.redis.get(key)
        if value:
            deserialized = pickle.loads(value)
            # Store in local cache
            self._update_local_cache(key, deserialized)
            return deserialized
        
        return None
    
    def set(self, key: str, value: Any, ttl: int = 3600):
        """Set value in both cache layers."""
        
        # Serialize for Redis
        serialized = pickle.dumps(value)
        
        # Store in Redis with TTL
        self.redis.setex(key, ttl, serialized)
        
        # Store in local cache
        self._update_local_cache(key, value)
    
    def _update_local_cache(self, key: str, value: Any):
        """Update local cache with LRU eviction."""
        
        if len(self.local_cache) >= self.max_local_size:
            # Remove oldest item (simple FIFO for demonstration)
            oldest_key = next(iter(self.local_cache))
            del self.local_cache[oldest_key]
        
        self.local_cache[key] = value
    
    def cache_result(self, prefix: str, ttl: int = 3600):
        """Decorator for caching function results."""
        
        def decorator(func):
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = self._generate_key(
                    prefix,
                    {'args': args, 'kwargs': kwargs}
                )
                
                # Check cache
                cached = self.get(cache_key)
                if cached is not None:
                    return cached
                
                # Execute function
                if asyncio.iscoroutinefunction(func):
                    result = await func(*args, **kwargs)
                else:
                    result = func(*args, **kwargs)
                
                # Cache result
                self.set(cache_key, result, ttl)
                
                return result
            
            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = self._generate_key(
                    prefix,
                    {'args': args, 'kwargs': kwargs}
                )
                
                # Check cache
                cached = self.get(cache_key)
                if cached is not None:
                    return cached
                
                # Execute function
                result = func(*args, **kwargs)
                
                # Cache result
                self.set(cache_key, result, ttl)
                
                return result
            
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        
        return decorator

# Global cache manager
cache = CacheManager()

# Usage example
@cache.cache_result('signature_calc', ttl=7200)
def calculate_expensive_signature(data):
    """Expensive calculation that benefits from caching."""
    # ... complex computation ...
    return result

Performance Optimisation

Request Batching

# your_service/api/batch.py
from typing import List, Dict, Any
import asyncio
from collections import defaultdict
import time

class BatchProcessor:
    """Batch multiple requests for efficient processing."""
    
    def __init__(self, batch_size: int = 10, batch_timeout: float = 0.1):
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout
        self.pending_requests = defaultdict(list)
        self.locks = defaultdict(asyncio.Lock)
    
    async def add_request(
        self,
        batch_key: str,
        request_data: Dict[str, Any]
    ) -> Any:
        """Add request to batch and wait for result."""
        
        request_id = str(uuid.uuid4())
        future = asyncio.Future()
        
        async with self.locks[batch_key]:
            self.pending_requests[batch_key].append({
                'id': request_id,
                'data': request_data,
                'future': future
            })
            
            # Check if batch is ready
            if len(self.pending_requests[batch_key]) >= self.batch_size:
                await self._process_batch(batch_key)
            else:
                # Schedule batch processing after timeout
                asyncio.create_task(
                    self._process_batch_delayed(batch_key)
                )
        
        return await future
    
    async def _process_batch_delayed(self, batch_key: str):
        """Process batch after timeout."""
        await asyncio.sleep(self.batch_timeout)
        async with self.locks[batch_key]:
            if self.pending_requests[batch_key]:
                await self._process_batch(batch_key)
    
    async def _process_batch(self, batch_key: str):
        """Process all pending requests in batch."""
        
        if not self.pending_requests[batch_key]:
            return
        
        batch = self.pending_requests[batch_key]
        self.pending_requests[batch_key] = []
        
        try:
            # Process entire batch
            batch_data = [req['data'] for req in batch]
            results = await self._execute_batch(batch_data)
            
            # Distribute results
            for req, result in zip(batch, results):
                req['future'].set_result(result)
        
        except Exception as e:
            # Set exception for all requests
            for req in batch:
                req['future'].set_exception(e)
    
    async def _execute_batch(self, batch_data: List[Dict[str, Any]]):
        """Execute batch processing logic."""
        # Implement actual batch processing
        return await process_batch_analysis(batch_data)

Data Streaming

# creativedynamics/streaming/processor.py
from typing import AsyncIterator, Dict, Any
import asyncio
import aiofiles
import pandas as pd

class StreamProcessor:
    """Process large datasets in streaming fashion."""
    
    def __init__(self, chunk_size: int = 10000):
        self.chunk_size = chunk_size
    
    async def process_file_stream(
        self,
        file_path: str
    ) -> AsyncIterator[Dict[str, Any]]:
        """Process large file in chunks."""
        
        async with aiofiles.open(file_path, mode='r') as file:
            # Read header
            header = await file.readline()
            columns = header.strip().split(',')
            
            chunk_data = []
            async for line in file:
                chunk_data.append(line.strip().split(','))
                
                if len(chunk_data) >= self.chunk_size:
                    # Process chunk
                    df = pd.DataFrame(chunk_data, columns=columns)
                    result = await self._process_chunk(df)
                    yield result
                    chunk_data = []
            
            # Process remaining data
            if chunk_data:
                df = pd.DataFrame(chunk_data, columns=columns)
                result = await self._process_chunk(df)
                yield result
    
    async def _process_chunk(self, df: pd.DataFrame) -> Dict[str, Any]:
        """Process a single chunk of data."""
        
        # Calculate signatures for chunk
        signatures = calculate_path_signatures(df)
        
        # Detect patterns
        patterns = detect_fatigue_patterns(signatures)
        
        return {
            'chunk_size': len(df),
            'signatures': signatures,
            'patterns': patterns
        }
    
    async def merge_results(
        self,
        results: AsyncIterator[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Merge results from all chunks."""
        
        merged = {
            'total_records': 0,
            'all_patterns': [],
            'aggregated_metrics': {}
        }
        
        async for chunk_result in results:
            merged['total_records'] += chunk_result['chunk_size']
            merged['all_patterns'].extend(chunk_result['patterns'])
            
            # Aggregate metrics
            for key, value in chunk_result.get('metrics', {}).items():
                if key not in merged['aggregated_metrics']:
                    merged['aggregated_metrics'][key] = []
                merged['aggregated_metrics'][key].append(value)
        
        # Finalize aggregation
        for key in merged['aggregated_metrics']:
            merged['aggregated_metrics'][key] = np.mean(
                merged['aggregated_metrics'][key]
            )
        
        return merged

Auto-Scaling Configuration

Kubernetes HPA

# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: creativedynamics-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: creativedynamics
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: request_rate
      target:
        type: AverageValue
        averageValue: "100"
  behaviour:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 2
        periodSeconds: 60

AWS Auto Scaling

# terraform/autoscaling.tf
resource "aws_autoscaling_group" "creativedynamics" {
  name                = "creativedynamics-asg"
  min_size            = 2
  max_size            = 10
  desired_capacity    = 3
  health_check_type   = "ELB"
  health_check_grace_period = 300
  
  launch_template {
    id      = aws_launch_template.creativedynamics.id
    version = "$Latest"
  }
  
  target_group_arns = [aws_lb_target_group.creativedynamics.arn]
  
  tag {
    key                 = "Name"
    value               = "creativedynamics-instance"
    propagate_at_launch = true
  }
}

resource "aws_autoscaling_policy" "scale_up" {
  name                   = "creativedynamics-scale-up"
  scaling_adjustment     = 2
  adjustment_type        = "ChangeInCapacity"
  cooldown              = 300
  autoscaling_group_name = aws_autoscaling_group.creativedynamics.name
}

resource "aws_autoscaling_policy" "scale_down" {
  name                   = "creativedynamics-scale-down"
  scaling_adjustment     = -1
  adjustment_type        = "ChangeInCapacity"
  cooldown              = 300
  autoscaling_group_name = aws_autoscaling_group.creativedynamics.name
}

resource "aws_cloudwatch_metric_alarm" "high_cpu" {
  alarm_name          = "creativedynamics-high-cpu"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name        = "CPUUtilization"
  namespace          = "AWS/EC2"
  period             = "120"
  statistic          = "Average"
  threshold          = "70"
  alarm_description  = "This metric monitors CPU utilization"
  alarm_actions      = [aws_autoscaling_policy.scale_up.arn]
}

Load Testing

Locust Configuration

# load_test/locustfile.py
from locust import HttpUser, task, between
import random
import json

class CreativeDynamicsUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        """Initialize user session."""
        self.client.verify = False
        self.headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer test-token'
        }
    
    @task(3)
    def health_check(self):
        """Simple health check."""
        self.client.get("/health")
    
    @task(10)
    def analyze_small(self):
        """Submit small analysis job."""
        data = self.generate_test_data(items=10)
        self.client.post(
            "/api/analyze",
            json=data,
            headers=self.headers
        )
    
    @task(5)
    def analyze_medium(self):
        """Submit medium analysis job."""
        data = self.generate_test_data(items=100)
        self.client.post(
            "/api/analyze",
            json=data,
            headers=self.headers
        )
    
    @task(1)
    def analyze_large(self):
        """Submit large analysis job."""
        data = self.generate_test_data(items=1000)
        self.client.post(
            "/api/analyze",
            json=data,
            headers=self.headers,
            timeout=60
        )
    
    def generate_test_data(self, items: int):
        """Generate test data for analysis."""
        return {
            'dataset_name': f'test_dataset_{random.randint(1, 1000)}',
            'items': [
                {
                    'id': f'item_{i}',
                    'metrics': {
                        'impressions': random.randint(1000, 100000),
                        'clicks': random.randint(10, 1000),
                        'ctr': random.uniform(0.001, 0.1),
                        'spend': random.uniform(10, 1000)
                    },
                    'timestamp': '2024-01-01T00:00:00Z'
                }
                for i in range(items)
            ]
        }

# Run with: locust -f locustfile.py --host=http://localhost:5001

Scaling Checklist

Pre-Scaling Requirements

  • Implement connection pooling

  • Set up caching layer (Redis)

  • Configure load balancer

  • Implement health checks

  • Set up monitoring and metrics

  • Configure auto-scaling policies

  • Implement rate limiting

  • Set up CDN for static assets

Scaling Milestones

  • Phase 1: 2-3 instances, 100 concurrent users

  • Phase 2: 5-10 instances, 500 concurrent users

  • Phase 3: 10-20 instances, 1000+ concurrent users

  • Phase 4: Multi-region deployment

Performance Targets

  • API response time: < 200ms (p95)

  • Analysis completion: < 30s for standard datasets

  • Throughput: 10,000+ requests/minute

  • Availability: 99.9% uptime

Next Steps