Source code for creativedynamics.core.analyzer

import logging
import time
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

import numpy as np
import pandas as pd
import tqdm

from creativedynamics.core.analysis_core import detect_change_points
from creativedynamics.core.fatigue_detection import detect_terminal_decline
from creativedynamics.core.wastage import (calculate_cpc_wastage,
                                           calculate_ctr_benchmark,
                                           calculate_ctr_wastage,
                                           detect_double_counting_risk)
from creativedynamics.data.schema import (
    DataSchema, ValidationResult, validate_items_batch
)
from creativedynamics.exceptions import (CreativeDynamicsError,
                                         DataValidationError, ProcessingError,
                                         SignatureCalculationError)
from creativedynamics.reporting.generator import (
    plot_combined_wastage_analysis, plot_item_analysis)

# Set up logging for this module
logger = logging.getLogger(__name__)


# =============================================================================
# Schema Validation Helpers (Phase 0 - Hoisted)
# =============================================================================

def _get_analysis_schema(
    metrics: Tuple[str, ...],
    spend_column: str,
    clicks_column: str,
) -> DataSchema:
    """Create a DataSchema based on the metrics being analyzed."""
    required_cols = {"day"}
    numeric_cols = set()
    
    if "ctr" in metrics:
        required_cols.update({"ctr", "impressions"})
        numeric_cols.update({"ctr", "impressions"})
    
    if "cpc" in metrics:
        required_cols.update({"cpc", clicks_column, spend_column})
        numeric_cols.update({"cpc", clicks_column, spend_column})
    
    return DataSchema(
        required_columns=required_cols,
        date_column="day",
        numeric_columns=numeric_cols,
    )


def _validate_items_upfront(
    item_time_series: Dict[str, pd.DataFrame],
    metrics: Tuple[str, ...],
    spend_column: str,
    clicks_column: str,
) -> Tuple[Dict[str, pd.DataFrame], Dict[str, str]]:
    """Validate all items upfront before analysis (hoisted validation)."""
    schema = _get_analysis_schema(metrics, spend_column, clicks_column)
    
    validation_result = validate_items_batch(
        items=item_time_series,
        schema=schema,
        fail_fast=False,
    )
    
    if validation_result.invalid_count > 0:
        logger.warning(
            f"Schema validation excluded {validation_result.invalid_count} items",
            extra={
                "invalid_count": validation_result.invalid_count,
                "valid_count": validation_result.valid_count,
                "success_rate": f"{validation_result.success_rate:.1f}%",
            }
        )
    
    return validation_result.valid_items, validation_result.invalid_items


# Removed get_case_insensitive function - all columns are now lowercase


def _phase1_change_detection(
    item_time_series: Dict[str, pd.DataFrame],
    metrics: Tuple[str, ...],
    window_size: int,
    threshold: float,
    signature_depth: int,
    method: str,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
    """Phase 1: Change point detection and trend analysis for all items.
    
    Detects pattern changes using path signatures and classifies segment trends
    for each item across specified metrics.
    
    Args:
        item_time_series: Dictionary mapping item IDs to time series DataFrames
        metrics: Tuple of metric names to analyze (e.g., 'ctr', 'cpc')
        window_size: Sliding window size for signature analysis
        threshold: Sensitivity threshold for change point detection
        signature_depth: Truncation depth for signature calculations
        method: Signature computation method (maintained for compatibility)
        
    Returns:
        Dictionary with structure: {item_id: {metric: {analysis_details}}}
        Each analysis_details contains:
            - change_points: List of detected change point indices
            - segment_trends: List of (start, end, trend) tuples
            - overall_trend: Dominant trend classification
            - distances: Signature distance values
            - threshold_value: Actual threshold used
            - pattern_change_detected: Boolean for declining patterns
    """
    results = {}
    
    logger.info("Phase 1: Performing Trend Analysis")
    for item_id_val, time_series_data in tqdm.tqdm(
        item_time_series.items(), desc="Trend Analysis"
    ):
        item_metric_results: Dict[str, Dict[str, Any]] = {}
        for metric in metrics:
            logger.debug(f"Analyzing metric '{metric}' for item {item_id_val}")

            if metric not in time_series_data.columns:
                logger.error(
                    f"Metric '{metric}' not found in columns",
                    extra={
                        "metric": metric,
                        "item_id": item_id_val,
                        "available_columns": time_series_data.columns.tolist(),
                    },
                )
                continue

            if time_series_data[metric].isnull().all():
                logger.error(
                    f"Metric '{metric}' has all null values",
                    extra={"metric": metric, "item_id": item_id_val},
                )
                continue

            try:
                change_points, distances, threshold_value, segment_trends = (
                    detect_change_points(
                        time_series_data,
                        metric=metric,
                        window_size=window_size,
                        threshold=threshold,
                        signature_depth=signature_depth,
                        method=method,
                    )
                )

                valid_trends = [
                    trend
                    for s, e, trend in segment_trends
                    if isinstance(s, pd.Timestamp)
                    and isinstance(e, pd.Timestamp)
                    and (e - s).days >= 0
                    and trend not in ["short_segment", "error", "insufficient_data"]
                ]

                overall_trend = (
                    max(set(valid_trends), key=valid_trends.count)
                    if valid_trends
                    else "undetermined"
                )
            except Exception as e:
                logger.error(
                    "Error in detect_change_points",
                    exc_info=True,
                    extra={"item_id": item_id_val, "metric": metric, "error": str(e)},
                )
                change_points, distances, threshold_value, segment_trends = (
                    [],
                    [],
                    None,
                    [],
                )
                overall_trend = "error"

            item_metric_results[metric] = {
                "change_points": change_points,
                "segment_trends": segment_trends,
                "overall_trend": overall_trend,
                "start_date": time_series_data["day"].min(),
                "distances": distances,
                "threshold_value": threshold_value,
                "pattern_change_detected": any(
                    trend == "declining"
                    for s, e, trend in segment_trends
                    if isinstance(s, pd.Timestamp)
                ),
                "signature_method": method,
            }
        results[item_id_val] = item_metric_results
    
    return results


def _phase2_cpc_wastage(
    item_time_series: Dict[str, pd.DataFrame],
    results: Dict[str, Dict[str, Dict[str, Any]]],
    spend_column: str,
    clicks_column: str,
) -> Dict[str, float]:
    """Phase 2: CPC wastage calculation for spend efficiency analysis.
    
    Calculates CPC benchmarks and overspending amounts for each item.
    
    Args:
        item_time_series: Dictionary mapping item IDs to time series DataFrames
        results: Existing results from Phase 1 (modified in-place)
        spend_column: Column name for spend data
        clicks_column: Column name for clicks data
        
    Returns:
        Dictionary mapping item IDs to CPC benchmark values
    """
    cpc_benchmarks: Dict[str, float] = {}
    
    logger.info("Phase 2: Calculating CPC Wastage")
    for item_id_val, time_series_data in tqdm.tqdm(
        item_time_series.items(), desc="CPC Wastage"
    ):
        if "cpc" not in results.get(item_id_val, {}):
            logger.warning(
                "Skipping CPC wastage - 'cpc' not in results",
                extra={
                    "item_id": item_id_val,
                    "available_metrics": list(results.get(item_id_val, {}).keys()),
                },
            )
            continue

        cpc_results = results[item_id_val]["cpc"]
        
        # Calculate CPC benchmark from optimal segments
        optimal_segments = [
            (s, e, trend)
            for s, e, trend in cpc_results["segment_trends"]
            if trend in ["stable", "improving"]
        ]
        
        if optimal_segments:
            longest_optimal_segment = max(
                optimal_segments, key=lambda seg: (seg[1] - seg[0]).days
            )
            benchmark_period_start, benchmark_period_end, _ = longest_optimal_segment
            benchmark_data = time_series_data[
                (time_series_data["day"] >= benchmark_period_start)
                & (time_series_data["day"] <= benchmark_period_end)
            ]
            cpc_benchmark = benchmark_data["cpc"].mean() if not benchmark_data.empty else np.nan
        else:
            # Fallback to overall average
            cpc_benchmark = time_series_data["cpc"].mean()
            benchmark_period_start = time_series_data["day"].min()
            benchmark_period_end = time_series_data["day"].max()

        if not np.isnan(cpc_benchmark):
            cpc_benchmarks[item_id_val] = cpc_benchmark
            logger.info(
                "CPC benchmark successfully calculated",
                extra={
                    "item_id": item_id_val,
                    "cpc_benchmark": cpc_benchmark,
                    "benchmark_period_start": benchmark_period_start,
                    "benchmark_period_end": benchmark_period_end,
                },
            )
            
            cpc_wastage_results = calculate_cpc_wastage(
                time_series_data,
                cpc_benchmark,
                benchmark_period_start=benchmark_period_start,
                benchmark_period_end=benchmark_period_end,
                clicks_column=clicks_column,
            )

            total_spend = (
                time_series_data[spend_column].sum()
                if spend_column in time_series_data.columns
                else 0
            )
            
            results[item_id_val]["cpc"]["spend_efficiency"] = {
                "actual_wastage_gbp": cpc_wastage_results.get("actual_wastage_gbp", 0.0),
                "benchmark_cpc": cpc_wastage_results.get("benchmark_cpc", cpc_benchmark),
                "benchmark_period_start": benchmark_period_start,
                "benchmark_period_end": benchmark_period_end,
                "overspend_periods": cpc_wastage_results.get("overspend_periods", []),
                "calculation_status": cpc_wastage_results.get("calculation_status", "Unknown"),
                "metric_type": "financial",
                "total_spend": total_spend,
            }
        else:
            total_spend = (
                time_series_data[spend_column].sum()
                if spend_column in time_series_data.columns
                else 0
            )
            results[item_id_val]["cpc"]["spend_efficiency"] = {
                "actual_wastage_gbp": 0.0,
                "benchmark_cpc": np.nan,
                "benchmark_period_start": None,
                "benchmark_period_end": None,
                "overspend_periods": [],
                "calculation_status": "Missing Required Data: CPC benchmark is NaN",
                "metric_type": "financial",
                "total_spend": total_spend,
            }
    
    return cpc_benchmarks


def _phase3_ctr_wastage(
    item_time_series: Dict[str, pd.DataFrame],
    results: Dict[str, Dict[str, Dict[str, Any]]],
    cpc_benchmarks: Dict[str, float],
    ctr_benchmark_method: Literal["stable_period", "peak_period", "moving_average"],
    plot_output_dir: Optional[str],
) -> None:
    """Phase 3: CTR wastage calculation for engagement performance analysis.
    
    Calculates CTR benchmarks and engagement lost clicks for each item.
    Modifies results dictionary in-place.
    
    Args:
        item_time_series: Dictionary mapping item IDs to time series DataFrames
        results: Existing results from Phase 1 & 2 (modified in-place)
        cpc_benchmarks: Dictionary mapping item IDs to CPC benchmark values
        ctr_benchmark_method: Method for CTR benchmark calculation
        plot_output_dir: Optional directory for debug output
    """
    logger.info("Phase 3: Calculating CTR Wastage")
    for item_id_val, time_series_data in tqdm.tqdm(
        item_time_series.items(), desc="CTR Wastage"
    ):
        if "ctr" not in results.get(item_id_val, {}):
            continue

        cpc_benchmark_for_ctr = cpc_benchmarks.get(item_id_val, np.nan)
        ctr_results = results[item_id_val]["ctr"]

        # Use the time series data directly - all column names should already be lowercase
        time_series_data_copy = time_series_data.copy()

        logger.debug(
            "CTR wastage calculation starting",
            extra={
                "item_id": item_id_val,
                "available_columns": time_series_data_copy.columns.tolist(),
                "sample_data": (
                    time_series_data_copy[["ctr", "impressions"]].head().to_dict()
                    if "ctr" in time_series_data_copy.columns
                    and "impressions" in time_series_data_copy.columns
                    else None
                ),
                "cpc_benchmark_for_ctr": cpc_benchmark_for_ctr,
            },
        )

        (
            ctr_benchmark,
            ctr_benchmark_start,
            ctr_benchmark_end,
            ctr_benchmark_status,
        ) = calculate_ctr_benchmark(
            time_series_data_copy,
            method=ctr_benchmark_method,
            segment_trends=ctr_results["segment_trends"],
        )

        logger.debug(
            "CTR benchmark calculated",
            extra={
                "item_id": item_id_val,
                "ctr_benchmark": ctr_benchmark,
                "ctr_benchmark_start": ctr_benchmark_start,
                "ctr_benchmark_end": ctr_benchmark_end,
                "ctr_benchmark_status": ctr_benchmark_status,
            },
        )

        ctr_wastage_results = calculate_ctr_wastage(
            time_series_data_copy,
            ctr_benchmark,
            cpc_benchmark_for_ctr,
            ctr_results["segment_trends"],
            output_dir=plot_output_dir,
        )

        logger.info(
            "CTR wastage calculation completed",
            extra={
                "item_id": item_id_val,
                "ctr_wastage_results": ctr_wastage_results,
            },
        )

        results[item_id_val]["ctr"]["engagement_performance"] = {
            "engagement_lost_clicks": ctr_wastage_results.get("engagement_lost_clicks", 0),
            "ctr_decline_percentage_points": ctr_wastage_results.get("ctr_decline_percentage_points", 0.0),
            "ctr_benchmark": ctr_wastage_results.get("ctr_benchmark", ctr_benchmark),
            "ctr_actual_average": ctr_wastage_results.get("ctr_actual_average", 0.0),
            "ctr_benchmark_period_start": ctr_benchmark_start,
            "ctr_benchmark_period_end": ctr_benchmark_end,
            "reference_value_gbp": ctr_wastage_results.get("reference_value_gbp", 0.0),
            "calculation_status": ctr_wastage_results.get(
                "calculation_status", "Unknown"
            ),
            "metric_type": "performance",
            "note": ctr_wastage_results.get("note", "reference_value_gbp is for comparison only"),
        }


def _phase4_finalise_results(
    item_time_series: Dict[str, pd.DataFrame],
    results: Dict[str, Dict[str, Dict[str, Any]]],
    spend_column: str,
    plot_output_dir: Optional[str],
) -> Dict[str, Dict[str, Dict[str, Any]]]:
    """Phase 4: Finalise results, add metadata, and generate plots.
    
    Adds analysis metadata (double-counting risk, fatigue detection) and
    generates visualisation plots for each item.
    
    Args:
        item_time_series: Dictionary mapping item IDs to time series DataFrames
        results: Existing results from Phase 1-3 (modified in-place)
        spend_column: Column name for spend data
        plot_output_dir: Optional directory for plot output
        
    Returns:
        Finalised results dictionary with metadata and plot paths
    """
    logger.info("Phase 4: Finalising and Adding Metadata")
    for item_id_val, time_series_data in tqdm.tqdm(
        item_time_series.items(), desc="Finalising"
    ):
        item_metric_results = results.get(item_id_val, {})
        spend_efficiency = item_metric_results.get("cpc", {}).get(
            "spend_efficiency", {}
        )
        engagement_performance = item_metric_results.get("ctr", {}).get(
            "engagement_performance", {}
        )

        # Add analysis metadata section (NOT combining metrics!)
        if "cpc" in item_metric_results:
            actual_overspend = spend_efficiency.get("actual_wastage_gbp", 0)
            engagement_lost_clicks = engagement_performance.get("engagement_lost_clicks", 0)
            cpc_benchmark = spend_efficiency.get("benchmark_cpc", np.nan)
            
            # Calculate double-counting risk with new signature
            if not np.isnan(cpc_benchmark):
                double_counting_risk = detect_double_counting_risk(
                    time_series_data, actual_overspend, engagement_lost_clicks, cpc_benchmark
                )
            else:
                double_counting_risk = "Unknown"

            # Detect terminal decline/creative fatigue for CTR
            ctr_fatigue: Dict[str, Any] = {}
            ctr_benchmark_val = engagement_performance.get("ctr_benchmark", np.nan)
            if "ctr" in item_metric_results and not np.isnan(ctr_benchmark_val):
                ctr_fatigue = detect_terminal_decline(
                    time_series=time_series_data,
                    metric="ctr",
                    benchmark=ctr_benchmark_val,
                    min_decline_days=14,
                )
            
            # Detect terminal decline for CPC
            cpc_fatigue: Dict[str, Any] = {}
            if not np.isnan(cpc_benchmark):
                cpc_fatigue = detect_terminal_decline(
                    time_series=time_series_data,
                    metric="cpc",
                    benchmark=cpc_benchmark,
                    min_decline_days=14,
                )
            
            # Store metadata separately - DO NOT combine metrics
            results[item_id_val]["analysis_metadata"] = {
                "double_counting_risk": double_counting_risk,
                "warning": "Spend efficiency and engagement performance measure different aspects and should not be combined",
                "ctr_fatigue": ctr_fatigue,
                "cpc_fatigue": cpc_fatigue,
                "overall_replacement_recommended": ctr_fatigue.get("replacement_recommended", False) or cpc_fatigue.get("replacement_recommended", False),
            }

            # Ensure total spend is preserved
            if "total_spend" not in spend_efficiency:
                total_spend = (
                    time_series_data[spend_column].sum()
                    if spend_column in time_series_data.columns
                    else 0
                )
                results[item_id_val]["cpc"]["spend_efficiency"]["total_spend"] = total_spend

            logger.debug(
                "Analysis metadata added",
                extra={
                    "item_id": item_id_val,
                    "actual_wastage_gbp": actual_overspend,
                    "engagement_lost_clicks": engagement_lost_clicks,
                    "double_counting_risk": double_counting_risk,
                },
            )

        # Generate plots for each metric
        for metric, results_data in item_metric_results.items():
            if metric == "analysis_metadata":
                continue
                
            # Get appropriate analysis data based on metric
            if metric == "cpc":
                analysis_data = results_data.get("spend_efficiency")
            elif metric == "ctr":
                analysis_data = results_data.get("engagement_performance")
            else:
                analysis_data = results_data.get("wastage_analysis")
            
            plot_file = plot_item_analysis(
                item_id_val,
                time_series_data,
                metric=metric,
                change_points=results_data["change_points"],
                distances=results_data["distances"],
                threshold=results_data["threshold_value"],
                segment_trends=results_data["segment_trends"],
                output_dir=plot_output_dir,
                wastage_analysis=analysis_data,
                show_changepoints=False,
            )
            results[item_id_val][metric]["plot_file"] = plot_file

        # Generate combined plot if both metrics available
        if spend_efficiency and engagement_performance:
            combined_plot_file = plot_combined_wastage_analysis(
                item_id_val,
                time_series_data,
                cpc_wastage_analysis=spend_efficiency,
                ctr_wastage_analysis=engagement_performance,
                output_dir=plot_output_dir,
            )
            if "cpc" in item_metric_results:
                results[item_id_val]["cpc"]["combined_plot"] = combined_plot_file
            if "ctr" in item_metric_results:
                results[item_id_val]["ctr"]["combined_plot"] = combined_plot_file

    return results


[docs] def analyze_all_items( item_time_series: Dict[str, pd.DataFrame], metrics: Tuple[str, ...] = ("ctr", "cpc"), window_size: int = 7, threshold: float = 1.5, signature_depth: int = 4, method: str = "auto", spend_column: str = "amount_spent_gbp", clicks_column: str = "link_clicks", plot_output_dir: Optional[str] = None, ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Runs the core analysis pipeline on a dictionary of time-series DataFrames. This is the main analysis function of the CreativeDynamics library, implementing a detailed four-phase analysis pipeline using rough path theory and signature methods to detect creative fatigue patterns in advertising performance data. The analysis pipeline consists of: 1. **Phase 1**: Change point detection using path signatures and trend classification 2. **Phase 2**: Spend efficiency calculation (CPC overspending analysis) 3. **Phase 3**: Engagement performance calculation (CTR decline analysis) 4. **Phase 4**: Results finalisation, metadata addition, and plot generation Args: item_time_series: A dictionary mapping item IDs (creative identifiers) to their corresponding time-series DataFrames. Each DataFrame must be sorted by date and contain a 'day' column plus the relevant metric columns ('ctr', 'cpc', 'impressions', etc.). Typically prepared by `prepare_item_time_series`. metrics: A tuple of metric names to analyze. Supported metrics are 'ctr' (click-through rate) and 'cpc' (cost-per-click). Each metric undergoes separate change point detection and wastage analysis. window_size: The sliding window size (in days) for signature analysis. Controls the temporal resolution of change point detection. Larger values provide more stable detection but may miss short-term changes. threshold: The sensitivity threshold for detecting change points in the signature distance metric. Higher values reduce sensitivity, detecting only major pattern changes. Lower values increase sensitivity. signature_depth: The truncation depth for path signature calculations. Controls the complexity of patterns captured by the signature method. Higher depths capture more complex patterns but increase computational cost. method: Legacy parameter maintained for backward compatibility. The roughpy library is always used for signature calculations regardless of this value. spend_column: The column name containing advertising spend data in GBP, used for spend efficiency calculations. Must exist in the DataFrames. clicks_column: The column name containing click count data, used for engagement performance calculations. Must exist in the DataFrames. plot_output_dir: Optional directory path where analysis plots will be saved. If None, plots are saved to a default 'output/plots' directory structure. Individual item plots and combined wastage analyses are generated. Returns: A nested dictionary containing detailed analysis results with the structure: `{item_id: {metric: {analysis_details}}}` where analysis_details contains: - **change_points**: List of time indices where significant pattern changes occur - **segment_trends**: List of (start_date, end_date, trend_classification) tuples - **overall_trend**: Dominant trend classification across the entire time series - **distances**: Signature distance values used for change point detection - **threshold_value**: Actual threshold used for change point detection - **pattern_change_detected**: Boolean indicating if declining patterns were found - **spend_efficiency** (for CPC analysis): Dictionary containing financial metrics: - actual_wastage_gbp: Excess spending vs benchmark CPC - benchmark_cpc: Optimal CPC benchmark value - benchmark_period_start/end: Date range of benchmark period - overspend_periods: List of periods with overspending - calculation_status: Status of calculation - metric_type: 'financial' - total_spend: Total advertising spend for the item - **engagement_performance** (for CTR analysis): Dictionary containing performance metrics: - engagement_lost_clicks: PRIMARY - Foregone clicks vs benchmark (integer) - ctr_decline_percentage_points: CTR decline in percentage points - ctr_benchmark: Optimal CTR benchmark value - ctr_actual_average: Actual average CTR in period - benchmark_period_start/end: Date range of benchmark period - reference_value_gbp: OPTIONAL - GBP translation for comparison only - calculation_status: Status of calculation - metric_type: 'performance' - note: Explanation that reference_value_gbp is for comparison only - **analysis_metadata**: Dictionary containing cross-metric analysis: - double_counting_risk: Risk level ('Low', 'Medium', 'High', 'Unknown') - warning: Reminder that metrics should not be combined - **plot_file**: File path of generated analysis plot Raises: CreativeDynamicsError: Base exception for library-specific errors during analysis. DataValidationError: When input data fails validation requirements. SignatureCalculationError: When path signature calculations encounter issues. ProcessingError: When analysis pipeline encounters processing failures. Example:: from creativedynamics.data.loader import prepare_item_time_series from creativedynamics.core.analyzer import analyze_all_items # Load and prepare time series data data = load_data('campaign_data.csv') items_dict, excluded = prepare_item_time_series(data) # Run detailed analysis pipeline results = analyze_all_items( items_dict, metrics=('ctr', 'cpc'), window_size=7, threshold=1.5, plot_output_dir='./analysis_plots' ) # Access results for a specific item item_results = results['CREATIVE_001'] # Financial metric (CPC) actual_overspend = item_results['cpc']['spend_efficiency']['actual_wastage_gbp'] # Performance metric (CTR) - clicks primary, GBP secondary engagement_gap = item_results['ctr']['engagement_performance']['engagement_lost_clicks'] reference_value = item_results['ctr']['engagement_performance']['reference_value_gbp'] # Overall trend and metadata overall_trend = item_results['cpc']['overall_trend'] risk_level = item_results['analysis_metadata']['double_counting_risk'] Note: This function implements the core CreativeDynamics methodology combining rough path theory with practical advertising analytics. The signature-based change point detection is particularly effective at identifying subtle creative fatigue patterns that traditional statistical methods might miss. The multi-phase approach ensures robust wastage calculations while accounting for metric interdependencies through double-counting risk assessment. Performance scales approximately O(n*m*w) where n=items, m=metrics, w=window_size. For large datasets, consider processing in batches or increasing window_size to reduce computational overhead. """ # Configuration ctr_benchmark_method: Literal["stable_period", "peak_period", "moving_average"] = ( "peak_period" ) # Start timing and metrics collection analysis_start_time = time.perf_counter() phase_timings: Dict[str, float] = {} total_items_input = len(item_time_series) logger.info( "Analysis pipeline started", extra={ "total_items": total_items_input, "metrics": metrics, "window_size": window_size, "threshold": threshold, } ) # --- Phase 0: Schema Validation (Hoisted) --- phase0_start = time.perf_counter() validated_items, excluded_items = _validate_items_upfront( item_time_series=item_time_series, metrics=metrics, spend_column=spend_column, clicks_column=clicks_column, ) phase_timings["phase0_validation"] = time.perf_counter() - phase0_start total_items = len(validated_items) validation_excluded = len(excluded_items) logger.info( f"Phase 0 (validation) completed in {phase_timings['phase0_validation']:.2f}s", extra={ "items_validated": total_items, "items_excluded": validation_excluded, } ) # Use validated items for subsequent processing item_time_series = validated_items # --- Phase 1: Change Point Detection and Trend Analysis --- phase1_start = time.perf_counter() # Delegates to _phase1_change_detection helper results = _phase1_change_detection( item_time_series=item_time_series, metrics=metrics, window_size=window_size, threshold=threshold, signature_depth=signature_depth, method=method, ) phase_timings["phase1_change_detection"] = time.perf_counter() - phase1_start logger.info(f"Phase 1 completed in {phase_timings['phase1_change_detection']:.2f}s") # Add spend column data to Phase 1 results (needed for backward compatibility) for item_id_val, time_series_data in item_time_series.items(): if item_id_val in results: for metric in results[item_id_val]: if "total_spend" not in results[item_id_val][metric]: total_spend = ( time_series_data[spend_column].sum() if spend_column in time_series_data.columns else 0 ) results[item_id_val][metric]["total_spend"] = total_spend if "wastage_analysis" not in results[item_id_val][metric]: results[item_id_val][metric]["wastage_analysis"] = {} # --- Phase 2: CPC Wastage Calculation --- phase2_start = time.perf_counter() # Delegates to _phase2_cpc_wastage helper cpc_benchmarks: Dict[str, float] = {} if "cpc" in metrics: cpc_benchmarks = _phase2_cpc_wastage( item_time_series=item_time_series, results=results, spend_column=spend_column, clicks_column=clicks_column, ) phase_timings["phase2_cpc_wastage"] = time.perf_counter() - phase2_start logger.info(f"Phase 2 completed in {phase_timings['phase2_cpc_wastage']:.2f}s") # --- Phase 3: CTR Wastage Calculation --- phase3_start = time.perf_counter() # Delegates to _phase3_ctr_wastage helper if "ctr" in metrics: _phase3_ctr_wastage( item_time_series=item_time_series, results=results, cpc_benchmarks=cpc_benchmarks, ctr_benchmark_method=ctr_benchmark_method, plot_output_dir=plot_output_dir, ) phase_timings["phase3_ctr_wastage"] = time.perf_counter() - phase3_start logger.info(f"Phase 3 completed in {phase_timings['phase3_ctr_wastage']:.2f}s") # --- Phase 4: Finalise Results and Generate Plots --- phase4_start = time.perf_counter() # Delegates to _phase4_finalise_results helper results = _phase4_finalise_results( item_time_series=item_time_series, results=results, spend_column=spend_column, plot_output_dir=plot_output_dir, ) phase_timings["phase4_finalise"] = time.perf_counter() - phase4_start logger.info(f"Phase 4 completed in {phase_timings['phase4_finalise']:.2f}s") # Calculate analysis coverage and timing summary total_elapsed = time.perf_counter() - analysis_start_time items_analysed = len(results) coverage_percentage = (items_analysed / total_items * 100) if total_items > 0 else 0.0 # Log detailed summary logger.info( "Analysis pipeline completed", extra={ "total_elapsed_seconds": round(total_elapsed, 2), "items_input": total_items, "items_analysed": items_analysed, "coverage_percentage": round(coverage_percentage, 1), "phase_timings": {k: round(v, 2) for k, v in phase_timings.items()}, } ) # Add pipeline metadata to results (accessible for reporting) # Note: Using type ignore as _pipeline_metadata is a special key with different structure results["_pipeline_metadata"] = { # type: ignore[assignment] "total_elapsed_seconds": round(total_elapsed, 2), "phase_timings": {k: round(v, 2) for k, v in phase_timings.items()}, "items_input": total_items, "items_analysed": items_analysed, "coverage_percentage": round(coverage_percentage, 1), "metrics_analysed": list(metrics), } return results