Source code for creativedynamics.core.signature_calculator

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Signature Computation for CreativeDynamics Library

This module provides signature computation functionality using the roughpy library
for the CreativeDynamics time-series analysis package.

Copyright (c) 2024-2025 T&P Data Science Ltd.
Author(s): Charles Shaw
Created: 2024-07-19
Last Modified: 2025-11-26
"""

import hashlib
import logging
import warnings
from functools import lru_cache
from typing import List, Optional, Tuple, Union

import numpy as np
from numpy.typing import NDArray
import roughpy as rp

from creativedynamics.exceptions import (CreativeDynamicsError,
                                         SignatureCalculationError)
from creativedynamics.core.constants import SIGNATURE_CACHE_SIZE
# Import centralized logging and exceptions
from creativedynamics.utils.logging_config import \
    setup_logging as centralized_setup_logging

# Initialize centralized logging
centralized_setup_logging()
logger = logging.getLogger(__name__)

# =============================================================================
# Caching Infrastructure
# =============================================================================

# Module-level cache statistics
_cache_hits: int = 0
_cache_misses: int = 0


def _array_to_cache_key(arr: NDArray[np.float64]) -> str:
    """Create a hashable cache key from a numpy array.
    
    Args:
        arr: Numpy array to hash.
        
    Returns:
        MD5 hash string of the array bytes.
    """
    return hashlib.md5(arr.tobytes()).hexdigest()


[docs] def get_cache_stats() -> dict: """Get signature cache statistics. Returns: Dictionary with cache hit/miss counts and hit rate. """ total = _cache_hits + _cache_misses hit_rate = (_cache_hits / total * 100) if total > 0 else 0.0 return { "hits": _cache_hits, "misses": _cache_misses, "total": total, "hit_rate_percent": hit_rate, }
[docs] def clear_signature_cache() -> None: """Clear the signature computation cache.""" global _cache_hits, _cache_misses _cached_signature_computation.cache_clear() _cache_hits = 0 _cache_misses = 0 logger.debug("Signature cache cleared")
@lru_cache(maxsize=SIGNATURE_CACHE_SIZE) def _cached_signature_computation( time_hash: str, metric_hash: str, time_bytes: bytes, metric_bytes: bytes, length: int, depth: int, normalize: bool, ) -> Tuple[float, ...]: """Cached signature computation (internal). This function is wrapped with LRU cache. The hash parameters are used for cache lookup, while bytes are used for actual computation. Args: time_hash: MD5 hash of time values (for cache key). metric_hash: MD5 hash of metric values (for cache key). time_bytes: Serialized time values. metric_bytes: Serialized metric values. length: Length of arrays. depth: Signature depth. normalize: Whether to normalize. Returns: Tuple of signature values (hashable for caching). """ global _cache_misses _cache_misses += 1 # Reconstruct arrays from bytes time_values = np.frombuffer(time_bytes, dtype=np.float64) metric_values = np.frombuffer(metric_bytes, dtype=np.float64) # Compute signature using the uncached implementation result = _compute_signature_uncached(time_values, metric_values, depth, normalize) return tuple(result.tolist()) def _compute_signature_uncached( time_values: NDArray[np.float64], metric_values: NDArray[np.float64], depth: int = 4, normalize: bool = True, ) -> NDArray[np.float64]: """Uncached signature computation (internal implementation). This is the core computation logic without caching. Use calculate_path_signature() or calculate_path_signature_cached() for public API access. """ if len(time_values) != len(metric_values): raise ValueError("Time and metric arrays must have the same length.") if len(time_values) < 2: # roughpy requires at least 2 points to form a segment logger.debug( "Path has less than 2 points, returning empty signature", extra={"path_length": len(time_values)}, ) return np.array([]) # Convert to numpy arrays of float type, ensure they are 1D time_values = np.array(time_values, dtype=np.float64).flatten() metric_values = np.array(metric_values, dtype=np.float64).flatten() # Path construction and normalization path_time = time_values.copy() path_metric = metric_values.copy() if normalize: # Normalize time to [0, 1] min_time = path_time[0] # Path is ordered by time max_time = path_time[-1] time_range = max_time - min_time if time_range > 0: path_time = (path_time - min_time) / time_range else: # All time points are the same (should not happen for days_since_start if len > 1) path_time = np.zeros_like(path_time) # Min-max normalize metric to [0, 1] min_metric = np.min(path_metric) max_metric = np.max(path_metric) metric_range = max_metric - min_metric if metric_range > 0: path_metric = (path_metric - min_metric) / metric_range else: # All metric values are the same path_metric = np.zeros_like( path_metric ) # Or np.full_like(path_metric, 0.5) for a mid-point # Combine into a 2D path: (n_points, n_dimensions) # Path should be [[t1, m1], [t2, m2], ...] path = np.column_stack((path_time, path_metric)) # Compute the signature using roughpy # roughpy returns the full signature including the 0-th level term (scalar 1) try: # Create roughpy context for the path dimensions and depth context = rp.get_context(width=path.shape[1], depth=depth, coeffs=rp.DPReal) # Compute increments of the path increments = np.diff(path, axis=0) # Create roughpy increment stream lie_increment_stream = rp.LieIncrementStream.from_increments( increments, ctx=context ) # Compute signature and convert to numpy array # Create interval for roughpy signature computation # Note: mypy shows errors here due to incomplete roughpy type stubs, but runtime works correctly interval = rp.RealInterval(inf=path_time[0], sup=path_time[-1]) # type: ignore[call-arg] full_signature = np.array(lie_increment_stream.signature(interval)) return full_signature except Exception as e: logger.error( "Error during roughpy signature computation", exc_info=True, extra={ "path_shape": path.shape, "depth": depth, "normalize": normalize, "error": str(e), }, ) # Re-raise as custom exception with context raise SignatureCalculationError( "Failed to compute path signature using roughpy", { "path_shape": path.shape, "depth": depth, "normalize": normalize, "original_error": str(e), }, ) from e
[docs] def calculate_path_signature( time_values: NDArray[np.float64], metric_values: NDArray[np.float64], depth: int = 4, normalize: bool = True, ) -> NDArray[np.float64]: """ Compute the signature of a path using the roughpy library. Args: time_values (numpy.ndarray): Array of time values (1D). metric_values (numpy.ndarray): Array of metric values (1D). depth (int): Truncation depth for signature computation. Defaults to 4. normalize (bool): Whether to normalize the path before computing the signature. Time is scaled to [0, 1]. Metric is min-max scaled to [0, 1]. Defaults to True. Returns: numpy.ndarray: Flattened signature of the path. Returns an empty array if path length < 2. """ return _compute_signature_uncached(time_values, metric_values, depth, normalize)
[docs] def calculate_path_signature_cached( time_values: NDArray[np.float64], metric_values: NDArray[np.float64], depth: int = 4, normalize: bool = True, ) -> NDArray[np.float64]: """ Compute signature with LRU caching for repeated calculations. This function caches results based on the input arrays, making it significantly faster when computing signatures for overlapping sliding windows or repeated analyses. Args: time_values (numpy.ndarray): Array of time values (1D). metric_values (numpy.ndarray): Array of metric values (1D). depth (int): Truncation depth for signature computation. Defaults to 4. normalize (bool): Whether to normalize the path. Defaults to True. Returns: numpy.ndarray: Flattened signature of the path. Note: Use clear_signature_cache() to clear the cache when memory is a concern or between different analysis runs. """ global _cache_hits if len(time_values) != len(metric_values): raise ValueError("Time and metric arrays must have the same length.") if len(time_values) < 2: return np.array([]) # Convert to numpy arrays and ensure consistent dtype time_arr = np.array(time_values, dtype=np.float64).flatten() metric_arr = np.array(metric_values, dtype=np.float64).flatten() # Create cache keys time_hash = _array_to_cache_key(time_arr) metric_hash = _array_to_cache_key(metric_arr) # Check if this would be a cache hit (for statistics) cache_info_before = _cached_signature_computation.cache_info() # Call cached function result_tuple = _cached_signature_computation( time_hash, metric_hash, time_arr.tobytes(), metric_arr.tobytes(), len(time_arr), depth, normalize, ) # Update hit counter if cache was hit cache_info_after = _cached_signature_computation.cache_info() if cache_info_after.hits > cache_info_before.hits: _cache_hits += 1 return np.array(result_tuple, dtype=np.float64)
# Maintaining the original function signature for backward compatibility. # The `method` parameter is kept for compatibility but is no longer used.
[docs] def compute_signature( time_values: NDArray[np.float64], metric_values: NDArray[np.float64], depth: int = 4, normalize: bool = True, method: Optional[str] = None, ) -> NDArray[np.float64]: # method is ignored """ Wrapper function to compute the signature of a 2D path (time vs. metric) using the ``roughpy`` library. This function constructs a 2D path from the provided time and metric values, optionally normalizes it, and then computes its truncated signature up to the specified depth. The signature includes all terms from level 0 to the specified depth. Args: time_values (numpy.ndarray): Array of time values (1D). metric_values (numpy.ndarray): Array of metric values (1D). depth (int, optional): The truncation depth for the signature computation. Defaults to 4. normalize (bool, optional): Whether to normalize the path data before computing the signature. If True, time is scaled to [0, 1] and the metric is min-max scaled to [0, 1]. Defaults to True. method (Optional[str], optional): This argument is ignored and only present for backward compatibility. The computation always uses ``roughpy``. Defaults to None. Returns: numpy.ndarray: A flattened numpy array representing the computed path signature. Returns an empty array if the path has fewer than 2 points or if an error occurs during computation. """ # Issue deprecation warning if method parameter is used if method is not None: warnings.warn( "The 'method' parameter is deprecated and will be removed in v1.0.0. " "The roughpy library is always used for signature calculations.", DeprecationWarning, stacklevel=2, ) return calculate_path_signature(time_values, metric_values, depth, normalize)
# The old compute_deep_rough_paths_signature can be removed or aliased if strict backward compatibility # for that specific name is needed externally, but it's not used internally by creative_fatigue_analysis.py # For now, let's remove it to clean up. # def compute_deep_rough_paths_signature(...): # return compute_signature(...)