import logging
import time
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import numpy as np
import pandas as pd
import tqdm
from creativedynamics.core.analysis_core import detect_change_points
from creativedynamics.core.fatigue_detection import detect_terminal_decline
from creativedynamics.core.wastage import (calculate_cpc_wastage,
calculate_ctr_benchmark,
calculate_ctr_wastage,
detect_double_counting_risk)
from creativedynamics.data.schema import (
DataSchema, ValidationResult, validate_items_batch
)
from creativedynamics.exceptions import (CreativeDynamicsError,
DataValidationError, ProcessingError,
SignatureCalculationError)
from creativedynamics.reporting.generator import (
plot_combined_wastage_analysis, plot_item_analysis)
# Set up logging for this module
logger = logging.getLogger(__name__)
# =============================================================================
# Schema Validation Helpers (Phase 0 - Hoisted)
# =============================================================================
def _get_analysis_schema(
metrics: Tuple[str, ...],
spend_column: str,
clicks_column: str,
) -> DataSchema:
"""Create a DataSchema based on the metrics being analyzed."""
required_cols = {"day"}
numeric_cols = set()
if "ctr" in metrics:
required_cols.update({"ctr", "impressions"})
numeric_cols.update({"ctr", "impressions"})
if "cpc" in metrics:
required_cols.update({"cpc", clicks_column, spend_column})
numeric_cols.update({"cpc", clicks_column, spend_column})
return DataSchema(
required_columns=required_cols,
date_column="day",
numeric_columns=numeric_cols,
)
def _validate_items_upfront(
item_time_series: Dict[str, pd.DataFrame],
metrics: Tuple[str, ...],
spend_column: str,
clicks_column: str,
) -> Tuple[Dict[str, pd.DataFrame], Dict[str, str]]:
"""Validate all items upfront before analysis (hoisted validation)."""
schema = _get_analysis_schema(metrics, spend_column, clicks_column)
validation_result = validate_items_batch(
items=item_time_series,
schema=schema,
fail_fast=False,
)
if validation_result.invalid_count > 0:
logger.warning(
f"Schema validation excluded {validation_result.invalid_count} items",
extra={
"invalid_count": validation_result.invalid_count,
"valid_count": validation_result.valid_count,
"success_rate": f"{validation_result.success_rate:.1f}%",
}
)
return validation_result.valid_items, validation_result.invalid_items
# Removed get_case_insensitive function - all columns are now lowercase
def _phase1_change_detection(
item_time_series: Dict[str, pd.DataFrame],
metrics: Tuple[str, ...],
window_size: int,
threshold: float,
signature_depth: int,
method: str,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Phase 1: Change point detection and trend analysis for all items.
Detects pattern changes using path signatures and classifies segment trends
for each item across specified metrics.
Args:
item_time_series: Dictionary mapping item IDs to time series DataFrames
metrics: Tuple of metric names to analyze (e.g., 'ctr', 'cpc')
window_size: Sliding window size for signature analysis
threshold: Sensitivity threshold for change point detection
signature_depth: Truncation depth for signature calculations
method: Signature computation method (maintained for compatibility)
Returns:
Dictionary with structure: {item_id: {metric: {analysis_details}}}
Each analysis_details contains:
- change_points: List of detected change point indices
- segment_trends: List of (start, end, trend) tuples
- overall_trend: Dominant trend classification
- distances: Signature distance values
- threshold_value: Actual threshold used
- pattern_change_detected: Boolean for declining patterns
"""
results = {}
logger.info("Phase 1: Performing Trend Analysis")
for item_id_val, time_series_data in tqdm.tqdm(
item_time_series.items(), desc="Trend Analysis"
):
item_metric_results: Dict[str, Dict[str, Any]] = {}
for metric in metrics:
logger.debug(f"Analyzing metric '{metric}' for item {item_id_val}")
if metric not in time_series_data.columns:
logger.error(
f"Metric '{metric}' not found in columns",
extra={
"metric": metric,
"item_id": item_id_val,
"available_columns": time_series_data.columns.tolist(),
},
)
continue
if time_series_data[metric].isnull().all():
logger.error(
f"Metric '{metric}' has all null values",
extra={"metric": metric, "item_id": item_id_val},
)
continue
try:
change_points, distances, threshold_value, segment_trends = (
detect_change_points(
time_series_data,
metric=metric,
window_size=window_size,
threshold=threshold,
signature_depth=signature_depth,
method=method,
)
)
valid_trends = [
trend
for s, e, trend in segment_trends
if isinstance(s, pd.Timestamp)
and isinstance(e, pd.Timestamp)
and (e - s).days >= 0
and trend not in ["short_segment", "error", "insufficient_data"]
]
overall_trend = (
max(set(valid_trends), key=valid_trends.count)
if valid_trends
else "undetermined"
)
except Exception as e:
logger.error(
"Error in detect_change_points",
exc_info=True,
extra={"item_id": item_id_val, "metric": metric, "error": str(e)},
)
change_points, distances, threshold_value, segment_trends = (
[],
[],
None,
[],
)
overall_trend = "error"
item_metric_results[metric] = {
"change_points": change_points,
"segment_trends": segment_trends,
"overall_trend": overall_trend,
"start_date": time_series_data["day"].min(),
"distances": distances,
"threshold_value": threshold_value,
"pattern_change_detected": any(
trend == "declining"
for s, e, trend in segment_trends
if isinstance(s, pd.Timestamp)
),
"signature_method": method,
}
results[item_id_val] = item_metric_results
return results
def _phase2_cpc_wastage(
item_time_series: Dict[str, pd.DataFrame],
results: Dict[str, Dict[str, Dict[str, Any]]],
spend_column: str,
clicks_column: str,
) -> Dict[str, float]:
"""Phase 2: CPC wastage calculation for spend efficiency analysis.
Calculates CPC benchmarks and overspending amounts for each item.
Args:
item_time_series: Dictionary mapping item IDs to time series DataFrames
results: Existing results from Phase 1 (modified in-place)
spend_column: Column name for spend data
clicks_column: Column name for clicks data
Returns:
Dictionary mapping item IDs to CPC benchmark values
"""
cpc_benchmarks: Dict[str, float] = {}
logger.info("Phase 2: Calculating CPC Wastage")
for item_id_val, time_series_data in tqdm.tqdm(
item_time_series.items(), desc="CPC Wastage"
):
if "cpc" not in results.get(item_id_val, {}):
logger.warning(
"Skipping CPC wastage - 'cpc' not in results",
extra={
"item_id": item_id_val,
"available_metrics": list(results.get(item_id_val, {}).keys()),
},
)
continue
cpc_results = results[item_id_val]["cpc"]
# Calculate CPC benchmark from optimal segments
optimal_segments = [
(s, e, trend)
for s, e, trend in cpc_results["segment_trends"]
if trend in ["stable", "improving"]
]
if optimal_segments:
longest_optimal_segment = max(
optimal_segments, key=lambda seg: (seg[1] - seg[0]).days
)
benchmark_period_start, benchmark_period_end, _ = longest_optimal_segment
benchmark_data = time_series_data[
(time_series_data["day"] >= benchmark_period_start)
& (time_series_data["day"] <= benchmark_period_end)
]
cpc_benchmark = benchmark_data["cpc"].mean() if not benchmark_data.empty else np.nan
else:
# Fallback to overall average
cpc_benchmark = time_series_data["cpc"].mean()
benchmark_period_start = time_series_data["day"].min()
benchmark_period_end = time_series_data["day"].max()
if not np.isnan(cpc_benchmark):
cpc_benchmarks[item_id_val] = cpc_benchmark
logger.info(
"CPC benchmark successfully calculated",
extra={
"item_id": item_id_val,
"cpc_benchmark": cpc_benchmark,
"benchmark_period_start": benchmark_period_start,
"benchmark_period_end": benchmark_period_end,
},
)
cpc_wastage_results = calculate_cpc_wastage(
time_series_data,
cpc_benchmark,
benchmark_period_start=benchmark_period_start,
benchmark_period_end=benchmark_period_end,
clicks_column=clicks_column,
)
total_spend = (
time_series_data[spend_column].sum()
if spend_column in time_series_data.columns
else 0
)
results[item_id_val]["cpc"]["spend_efficiency"] = {
"actual_wastage_gbp": cpc_wastage_results.get("actual_wastage_gbp", 0.0),
"benchmark_cpc": cpc_wastage_results.get("benchmark_cpc", cpc_benchmark),
"benchmark_period_start": benchmark_period_start,
"benchmark_period_end": benchmark_period_end,
"overspend_periods": cpc_wastage_results.get("overspend_periods", []),
"calculation_status": cpc_wastage_results.get("calculation_status", "Unknown"),
"metric_type": "financial",
"total_spend": total_spend,
}
else:
total_spend = (
time_series_data[spend_column].sum()
if spend_column in time_series_data.columns
else 0
)
results[item_id_val]["cpc"]["spend_efficiency"] = {
"actual_wastage_gbp": 0.0,
"benchmark_cpc": np.nan,
"benchmark_period_start": None,
"benchmark_period_end": None,
"overspend_periods": [],
"calculation_status": "Missing Required Data: CPC benchmark is NaN",
"metric_type": "financial",
"total_spend": total_spend,
}
return cpc_benchmarks
def _phase3_ctr_wastage(
item_time_series: Dict[str, pd.DataFrame],
results: Dict[str, Dict[str, Dict[str, Any]]],
cpc_benchmarks: Dict[str, float],
ctr_benchmark_method: Literal["stable_period", "peak_period", "moving_average"],
plot_output_dir: Optional[str],
) -> None:
"""Phase 3: CTR wastage calculation for engagement performance analysis.
Calculates CTR benchmarks and engagement lost clicks for each item.
Modifies results dictionary in-place.
Args:
item_time_series: Dictionary mapping item IDs to time series DataFrames
results: Existing results from Phase 1 & 2 (modified in-place)
cpc_benchmarks: Dictionary mapping item IDs to CPC benchmark values
ctr_benchmark_method: Method for CTR benchmark calculation
plot_output_dir: Optional directory for debug output
"""
logger.info("Phase 3: Calculating CTR Wastage")
for item_id_val, time_series_data in tqdm.tqdm(
item_time_series.items(), desc="CTR Wastage"
):
if "ctr" not in results.get(item_id_val, {}):
continue
cpc_benchmark_for_ctr = cpc_benchmarks.get(item_id_val, np.nan)
ctr_results = results[item_id_val]["ctr"]
# Use the time series data directly - all column names should already be lowercase
time_series_data_copy = time_series_data.copy()
logger.debug(
"CTR wastage calculation starting",
extra={
"item_id": item_id_val,
"available_columns": time_series_data_copy.columns.tolist(),
"sample_data": (
time_series_data_copy[["ctr", "impressions"]].head().to_dict()
if "ctr" in time_series_data_copy.columns
and "impressions" in time_series_data_copy.columns
else None
),
"cpc_benchmark_for_ctr": cpc_benchmark_for_ctr,
},
)
(
ctr_benchmark,
ctr_benchmark_start,
ctr_benchmark_end,
ctr_benchmark_status,
) = calculate_ctr_benchmark(
time_series_data_copy,
method=ctr_benchmark_method,
segment_trends=ctr_results["segment_trends"],
)
logger.debug(
"CTR benchmark calculated",
extra={
"item_id": item_id_val,
"ctr_benchmark": ctr_benchmark,
"ctr_benchmark_start": ctr_benchmark_start,
"ctr_benchmark_end": ctr_benchmark_end,
"ctr_benchmark_status": ctr_benchmark_status,
},
)
ctr_wastage_results = calculate_ctr_wastage(
time_series_data_copy,
ctr_benchmark,
cpc_benchmark_for_ctr,
ctr_results["segment_trends"],
output_dir=plot_output_dir,
)
logger.info(
"CTR wastage calculation completed",
extra={
"item_id": item_id_val,
"ctr_wastage_results": ctr_wastage_results,
},
)
results[item_id_val]["ctr"]["engagement_performance"] = {
"engagement_lost_clicks": ctr_wastage_results.get("engagement_lost_clicks", 0),
"ctr_decline_percentage_points": ctr_wastage_results.get("ctr_decline_percentage_points", 0.0),
"ctr_benchmark": ctr_wastage_results.get("ctr_benchmark", ctr_benchmark),
"ctr_actual_average": ctr_wastage_results.get("ctr_actual_average", 0.0),
"ctr_benchmark_period_start": ctr_benchmark_start,
"ctr_benchmark_period_end": ctr_benchmark_end,
"reference_value_gbp": ctr_wastage_results.get("reference_value_gbp", 0.0),
"calculation_status": ctr_wastage_results.get(
"calculation_status", "Unknown"
),
"metric_type": "performance",
"note": ctr_wastage_results.get("note", "reference_value_gbp is for comparison only"),
}
def _phase4_finalise_results(
item_time_series: Dict[str, pd.DataFrame],
results: Dict[str, Dict[str, Dict[str, Any]]],
spend_column: str,
plot_output_dir: Optional[str],
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Phase 4: Finalise results, add metadata, and generate plots.
Adds analysis metadata (double-counting risk, fatigue detection) and
generates visualisation plots for each item.
Args:
item_time_series: Dictionary mapping item IDs to time series DataFrames
results: Existing results from Phase 1-3 (modified in-place)
spend_column: Column name for spend data
plot_output_dir: Optional directory for plot output
Returns:
Finalised results dictionary with metadata and plot paths
"""
logger.info("Phase 4: Finalising and Adding Metadata")
for item_id_val, time_series_data in tqdm.tqdm(
item_time_series.items(), desc="Finalising"
):
item_metric_results = results.get(item_id_val, {})
spend_efficiency = item_metric_results.get("cpc", {}).get(
"spend_efficiency", {}
)
engagement_performance = item_metric_results.get("ctr", {}).get(
"engagement_performance", {}
)
# Add analysis metadata section (NOT combining metrics!)
if "cpc" in item_metric_results:
actual_overspend = spend_efficiency.get("actual_wastage_gbp", 0)
engagement_lost_clicks = engagement_performance.get("engagement_lost_clicks", 0)
cpc_benchmark = spend_efficiency.get("benchmark_cpc", np.nan)
# Calculate double-counting risk with new signature
if not np.isnan(cpc_benchmark):
double_counting_risk = detect_double_counting_risk(
time_series_data, actual_overspend, engagement_lost_clicks, cpc_benchmark
)
else:
double_counting_risk = "Unknown"
# Detect terminal decline/creative fatigue for CTR
ctr_fatigue: Dict[str, Any] = {}
ctr_benchmark_val = engagement_performance.get("ctr_benchmark", np.nan)
if "ctr" in item_metric_results and not np.isnan(ctr_benchmark_val):
ctr_fatigue = detect_terminal_decline(
time_series=time_series_data,
metric="ctr",
benchmark=ctr_benchmark_val,
min_decline_days=14,
)
# Detect terminal decline for CPC
cpc_fatigue: Dict[str, Any] = {}
if not np.isnan(cpc_benchmark):
cpc_fatigue = detect_terminal_decline(
time_series=time_series_data,
metric="cpc",
benchmark=cpc_benchmark,
min_decline_days=14,
)
# Store metadata separately - DO NOT combine metrics
results[item_id_val]["analysis_metadata"] = {
"double_counting_risk": double_counting_risk,
"warning": "Spend efficiency and engagement performance measure different aspects and should not be combined",
"ctr_fatigue": ctr_fatigue,
"cpc_fatigue": cpc_fatigue,
"overall_replacement_recommended": ctr_fatigue.get("replacement_recommended", False) or cpc_fatigue.get("replacement_recommended", False),
}
# Ensure total spend is preserved
if "total_spend" not in spend_efficiency:
total_spend = (
time_series_data[spend_column].sum()
if spend_column in time_series_data.columns
else 0
)
results[item_id_val]["cpc"]["spend_efficiency"]["total_spend"] = total_spend
logger.debug(
"Analysis metadata added",
extra={
"item_id": item_id_val,
"actual_wastage_gbp": actual_overspend,
"engagement_lost_clicks": engagement_lost_clicks,
"double_counting_risk": double_counting_risk,
},
)
# Generate plots for each metric
for metric, results_data in item_metric_results.items():
if metric == "analysis_metadata":
continue
# Get appropriate analysis data based on metric
if metric == "cpc":
analysis_data = results_data.get("spend_efficiency")
elif metric == "ctr":
analysis_data = results_data.get("engagement_performance")
else:
analysis_data = results_data.get("wastage_analysis")
plot_file = plot_item_analysis(
item_id_val,
time_series_data,
metric=metric,
change_points=results_data["change_points"],
distances=results_data["distances"],
threshold=results_data["threshold_value"],
segment_trends=results_data["segment_trends"],
output_dir=plot_output_dir,
wastage_analysis=analysis_data,
show_changepoints=False,
)
results[item_id_val][metric]["plot_file"] = plot_file
# Generate combined plot if both metrics available
if spend_efficiency and engagement_performance:
combined_plot_file = plot_combined_wastage_analysis(
item_id_val,
time_series_data,
cpc_wastage_analysis=spend_efficiency,
ctr_wastage_analysis=engagement_performance,
output_dir=plot_output_dir,
)
if "cpc" in item_metric_results:
results[item_id_val]["cpc"]["combined_plot"] = combined_plot_file
if "ctr" in item_metric_results:
results[item_id_val]["ctr"]["combined_plot"] = combined_plot_file
return results
[docs]
def analyze_all_items(
item_time_series: Dict[str, pd.DataFrame],
metrics: Tuple[str, ...] = ("ctr", "cpc"),
window_size: int = 7,
threshold: float = 1.5,
signature_depth: int = 4,
method: str = "auto",
spend_column: str = "amount_spent_gbp",
clicks_column: str = "link_clicks",
plot_output_dir: Optional[str] = None,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Runs the core analysis pipeline on a dictionary of time-series DataFrames.
This is the main analysis function of the CreativeDynamics library, implementing
a detailed four-phase analysis pipeline using rough path theory and signature
methods to detect creative fatigue patterns in advertising performance data.
The analysis pipeline consists of:
1. **Phase 1**: Change point detection using path signatures and trend classification
2. **Phase 2**: Spend efficiency calculation (CPC overspending analysis)
3. **Phase 3**: Engagement performance calculation (CTR decline analysis)
4. **Phase 4**: Results finalisation, metadata addition, and plot generation
Args:
item_time_series: A dictionary mapping item IDs (creative identifiers) to their
corresponding time-series DataFrames. Each DataFrame must be sorted by date
and contain a 'day' column plus the relevant metric columns ('ctr', 'cpc',
'impressions', etc.). Typically prepared by `prepare_item_time_series`.
metrics: A tuple of metric names to analyze. Supported metrics are 'ctr'
(click-through rate) and 'cpc' (cost-per-click). Each metric undergoes
separate change point detection and wastage analysis.
window_size: The sliding window size (in days) for signature analysis.
Controls the temporal resolution of change point detection. Larger values
provide more stable detection but may miss short-term changes.
threshold: The sensitivity threshold for detecting change points in the
signature distance metric. Higher values reduce sensitivity, detecting
only major pattern changes. Lower values increase sensitivity.
signature_depth: The truncation depth for path signature calculations.
Controls the complexity of patterns captured by the signature method.
Higher depths capture more complex patterns but increase computational cost.
method: Legacy parameter maintained for backward compatibility. The roughpy
library is always used for signature calculations regardless of this value.
spend_column: The column name containing advertising spend data in GBP,
used for spend efficiency calculations. Must exist in the DataFrames.
clicks_column: The column name containing click count data, used for
engagement performance calculations. Must exist in the DataFrames.
plot_output_dir: Optional directory path where analysis plots will be saved.
If None, plots are saved to a default 'output/plots' directory structure.
Individual item plots and combined wastage analyses are generated.
Returns:
A nested dictionary containing detailed analysis results with the structure:
`{item_id: {metric: {analysis_details}}}` where analysis_details contains:
- **change_points**: List of time indices where significant pattern changes occur
- **segment_trends**: List of (start_date, end_date, trend_classification) tuples
- **overall_trend**: Dominant trend classification across the entire time series
- **distances**: Signature distance values used for change point detection
- **threshold_value**: Actual threshold used for change point detection
- **pattern_change_detected**: Boolean indicating if declining patterns were found
- **spend_efficiency** (for CPC analysis): Dictionary containing financial metrics:
- actual_wastage_gbp: Excess spending vs benchmark CPC
- benchmark_cpc: Optimal CPC benchmark value
- benchmark_period_start/end: Date range of benchmark period
- overspend_periods: List of periods with overspending
- calculation_status: Status of calculation
- metric_type: 'financial'
- total_spend: Total advertising spend for the item
- **engagement_performance** (for CTR analysis): Dictionary containing performance metrics:
- engagement_lost_clicks: PRIMARY - Foregone clicks vs benchmark (integer)
- ctr_decline_percentage_points: CTR decline in percentage points
- ctr_benchmark: Optimal CTR benchmark value
- ctr_actual_average: Actual average CTR in period
- benchmark_period_start/end: Date range of benchmark period
- reference_value_gbp: OPTIONAL - GBP translation for comparison only
- calculation_status: Status of calculation
- metric_type: 'performance'
- note: Explanation that reference_value_gbp is for comparison only
- **analysis_metadata**: Dictionary containing cross-metric analysis:
- double_counting_risk: Risk level ('Low', 'Medium', 'High', 'Unknown')
- warning: Reminder that metrics should not be combined
- **plot_file**: File path of generated analysis plot
Raises:
CreativeDynamicsError: Base exception for library-specific errors during analysis.
DataValidationError: When input data fails validation requirements.
SignatureCalculationError: When path signature calculations encounter issues.
ProcessingError: When analysis pipeline encounters processing failures.
Example::
from creativedynamics.data.loader import prepare_item_time_series
from creativedynamics.core.analyzer import analyze_all_items
# Load and prepare time series data
data = load_data('campaign_data.csv')
items_dict, excluded = prepare_item_time_series(data)
# Run detailed analysis pipeline
results = analyze_all_items(
items_dict,
metrics=('ctr', 'cpc'),
window_size=7,
threshold=1.5,
plot_output_dir='./analysis_plots'
)
# Access results for a specific item
item_results = results['CREATIVE_001']
# Financial metric (CPC)
actual_overspend = item_results['cpc']['spend_efficiency']['actual_wastage_gbp']
# Performance metric (CTR) - clicks primary, GBP secondary
engagement_gap = item_results['ctr']['engagement_performance']['engagement_lost_clicks']
reference_value = item_results['ctr']['engagement_performance']['reference_value_gbp']
# Overall trend and metadata
overall_trend = item_results['cpc']['overall_trend']
risk_level = item_results['analysis_metadata']['double_counting_risk']
Note:
This function implements the core CreativeDynamics methodology combining rough
path theory with practical advertising analytics. The signature-based change
point detection is particularly effective at identifying subtle creative fatigue
patterns that traditional statistical methods might miss. The multi-phase
approach ensures robust wastage calculations while accounting for metric
interdependencies through double-counting risk assessment.
Performance scales approximately O(n*m*w) where n=items, m=metrics, w=window_size.
For large datasets, consider processing in batches or increasing window_size to
reduce computational overhead.
"""
# Configuration
ctr_benchmark_method: Literal["stable_period", "peak_period", "moving_average"] = (
"peak_period"
)
# Start timing and metrics collection
analysis_start_time = time.perf_counter()
phase_timings: Dict[str, float] = {}
total_items_input = len(item_time_series)
logger.info(
"Analysis pipeline started",
extra={
"total_items": total_items_input,
"metrics": metrics,
"window_size": window_size,
"threshold": threshold,
}
)
# --- Phase 0: Schema Validation (Hoisted) ---
phase0_start = time.perf_counter()
validated_items, excluded_items = _validate_items_upfront(
item_time_series=item_time_series,
metrics=metrics,
spend_column=spend_column,
clicks_column=clicks_column,
)
phase_timings["phase0_validation"] = time.perf_counter() - phase0_start
total_items = len(validated_items)
validation_excluded = len(excluded_items)
logger.info(
f"Phase 0 (validation) completed in {phase_timings['phase0_validation']:.2f}s",
extra={
"items_validated": total_items,
"items_excluded": validation_excluded,
}
)
# Use validated items for subsequent processing
item_time_series = validated_items
# --- Phase 1: Change Point Detection and Trend Analysis ---
phase1_start = time.perf_counter()
# Delegates to _phase1_change_detection helper
results = _phase1_change_detection(
item_time_series=item_time_series,
metrics=metrics,
window_size=window_size,
threshold=threshold,
signature_depth=signature_depth,
method=method,
)
phase_timings["phase1_change_detection"] = time.perf_counter() - phase1_start
logger.info(f"Phase 1 completed in {phase_timings['phase1_change_detection']:.2f}s")
# Add spend column data to Phase 1 results (needed for backward compatibility)
for item_id_val, time_series_data in item_time_series.items():
if item_id_val in results:
for metric in results[item_id_val]:
if "total_spend" not in results[item_id_val][metric]:
total_spend = (
time_series_data[spend_column].sum()
if spend_column in time_series_data.columns
else 0
)
results[item_id_val][metric]["total_spend"] = total_spend
if "wastage_analysis" not in results[item_id_val][metric]:
results[item_id_val][metric]["wastage_analysis"] = {}
# --- Phase 2: CPC Wastage Calculation ---
phase2_start = time.perf_counter()
# Delegates to _phase2_cpc_wastage helper
cpc_benchmarks: Dict[str, float] = {}
if "cpc" in metrics:
cpc_benchmarks = _phase2_cpc_wastage(
item_time_series=item_time_series,
results=results,
spend_column=spend_column,
clicks_column=clicks_column,
)
phase_timings["phase2_cpc_wastage"] = time.perf_counter() - phase2_start
logger.info(f"Phase 2 completed in {phase_timings['phase2_cpc_wastage']:.2f}s")
# --- Phase 3: CTR Wastage Calculation ---
phase3_start = time.perf_counter()
# Delegates to _phase3_ctr_wastage helper
if "ctr" in metrics:
_phase3_ctr_wastage(
item_time_series=item_time_series,
results=results,
cpc_benchmarks=cpc_benchmarks,
ctr_benchmark_method=ctr_benchmark_method,
plot_output_dir=plot_output_dir,
)
phase_timings["phase3_ctr_wastage"] = time.perf_counter() - phase3_start
logger.info(f"Phase 3 completed in {phase_timings['phase3_ctr_wastage']:.2f}s")
# --- Phase 4: Finalise Results and Generate Plots ---
phase4_start = time.perf_counter()
# Delegates to _phase4_finalise_results helper
results = _phase4_finalise_results(
item_time_series=item_time_series,
results=results,
spend_column=spend_column,
plot_output_dir=plot_output_dir,
)
phase_timings["phase4_finalise"] = time.perf_counter() - phase4_start
logger.info(f"Phase 4 completed in {phase_timings['phase4_finalise']:.2f}s")
# Calculate analysis coverage and timing summary
total_elapsed = time.perf_counter() - analysis_start_time
items_analysed = len(results)
coverage_percentage = (items_analysed / total_items * 100) if total_items > 0 else 0.0
# Log detailed summary
logger.info(
"Analysis pipeline completed",
extra={
"total_elapsed_seconds": round(total_elapsed, 2),
"items_input": total_items,
"items_analysed": items_analysed,
"coverage_percentage": round(coverage_percentage, 1),
"phase_timings": {k: round(v, 2) for k, v in phase_timings.items()},
}
)
# Add pipeline metadata to results (accessible for reporting)
# Note: Using type ignore as _pipeline_metadata is a special key with different structure
results["_pipeline_metadata"] = { # type: ignore[assignment]
"total_elapsed_seconds": round(total_elapsed, 2),
"phase_timings": {k: round(v, 2) for k, v in phase_timings.items()},
"items_input": total_items,
"items_analysed": items_analysed,
"coverage_percentage": round(coverage_percentage, 1),
"metrics_analysed": list(metrics),
}
return results