#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Signature Computation for CreativeDynamics Library
This module provides signature computation functionality using the roughpy library
for the CreativeDynamics time-series analysis package.
Copyright (c) 2024-2025 T&P Data Science Ltd.
Author(s): Charles Shaw
Created: 2024-07-19
Last Modified: 2025-11-26
"""
import hashlib
import logging
import warnings
from functools import lru_cache
from typing import List, Optional, Tuple, Union
import numpy as np
from numpy.typing import NDArray
import roughpy as rp
from creativedynamics.exceptions import (CreativeDynamicsError,
SignatureCalculationError)
from creativedynamics.core.constants import SIGNATURE_CACHE_SIZE
# Import centralized logging and exceptions
from creativedynamics.utils.logging_config import \
setup_logging as centralized_setup_logging
# Initialize centralized logging
centralized_setup_logging()
logger = logging.getLogger(__name__)
# =============================================================================
# Caching Infrastructure
# =============================================================================
# Module-level cache statistics
_cache_hits: int = 0
_cache_misses: int = 0
def _array_to_cache_key(arr: NDArray[np.float64]) -> str:
"""Create a hashable cache key from a numpy array.
Args:
arr: Numpy array to hash.
Returns:
MD5 hash string of the array bytes.
"""
return hashlib.md5(arr.tobytes()).hexdigest()
[docs]
def get_cache_stats() -> dict:
"""Get signature cache statistics.
Returns:
Dictionary with cache hit/miss counts and hit rate.
"""
total = _cache_hits + _cache_misses
hit_rate = (_cache_hits / total * 100) if total > 0 else 0.0
return {
"hits": _cache_hits,
"misses": _cache_misses,
"total": total,
"hit_rate_percent": hit_rate,
}
[docs]
def clear_signature_cache() -> None:
"""Clear the signature computation cache."""
global _cache_hits, _cache_misses
_cached_signature_computation.cache_clear()
_cache_hits = 0
_cache_misses = 0
logger.debug("Signature cache cleared")
@lru_cache(maxsize=SIGNATURE_CACHE_SIZE)
def _cached_signature_computation(
time_hash: str,
metric_hash: str,
time_bytes: bytes,
metric_bytes: bytes,
length: int,
depth: int,
normalize: bool,
) -> Tuple[float, ...]:
"""Cached signature computation (internal).
This function is wrapped with LRU cache. The hash parameters are used
for cache lookup, while bytes are used for actual computation.
Args:
time_hash: MD5 hash of time values (for cache key).
metric_hash: MD5 hash of metric values (for cache key).
time_bytes: Serialized time values.
metric_bytes: Serialized metric values.
length: Length of arrays.
depth: Signature depth.
normalize: Whether to normalize.
Returns:
Tuple of signature values (hashable for caching).
"""
global _cache_misses
_cache_misses += 1
# Reconstruct arrays from bytes
time_values = np.frombuffer(time_bytes, dtype=np.float64)
metric_values = np.frombuffer(metric_bytes, dtype=np.float64)
# Compute signature using the uncached implementation
result = _compute_signature_uncached(time_values, metric_values, depth, normalize)
return tuple(result.tolist())
def _compute_signature_uncached(
time_values: NDArray[np.float64],
metric_values: NDArray[np.float64],
depth: int = 4,
normalize: bool = True,
) -> NDArray[np.float64]:
"""Uncached signature computation (internal implementation).
This is the core computation logic without caching.
Use calculate_path_signature() or calculate_path_signature_cached()
for public API access.
"""
if len(time_values) != len(metric_values):
raise ValueError("Time and metric arrays must have the same length.")
if len(time_values) < 2:
# roughpy requires at least 2 points to form a segment
logger.debug(
"Path has less than 2 points, returning empty signature",
extra={"path_length": len(time_values)},
)
return np.array([])
# Convert to numpy arrays of float type, ensure they are 1D
time_values = np.array(time_values, dtype=np.float64).flatten()
metric_values = np.array(metric_values, dtype=np.float64).flatten()
# Path construction and normalization
path_time = time_values.copy()
path_metric = metric_values.copy()
if normalize:
# Normalize time to [0, 1]
min_time = path_time[0] # Path is ordered by time
max_time = path_time[-1]
time_range = max_time - min_time
if time_range > 0:
path_time = (path_time - min_time) / time_range
else: # All time points are the same (should not happen for days_since_start if len > 1)
path_time = np.zeros_like(path_time)
# Min-max normalize metric to [0, 1]
min_metric = np.min(path_metric)
max_metric = np.max(path_metric)
metric_range = max_metric - min_metric
if metric_range > 0:
path_metric = (path_metric - min_metric) / metric_range
else: # All metric values are the same
path_metric = np.zeros_like(
path_metric
) # Or np.full_like(path_metric, 0.5) for a mid-point
# Combine into a 2D path: (n_points, n_dimensions)
# Path should be [[t1, m1], [t2, m2], ...]
path = np.column_stack((path_time, path_metric))
# Compute the signature using roughpy
# roughpy returns the full signature including the 0-th level term (scalar 1)
try:
# Create roughpy context for the path dimensions and depth
context = rp.get_context(width=path.shape[1], depth=depth, coeffs=rp.DPReal)
# Compute increments of the path
increments = np.diff(path, axis=0)
# Create roughpy increment stream
lie_increment_stream = rp.LieIncrementStream.from_increments(
increments, ctx=context
)
# Compute signature and convert to numpy array
# Create interval for roughpy signature computation
# Note: mypy shows errors here due to incomplete roughpy type stubs, but runtime works correctly
interval = rp.RealInterval(inf=path_time[0], sup=path_time[-1]) # type: ignore[call-arg]
full_signature = np.array(lie_increment_stream.signature(interval))
return full_signature
except Exception as e:
logger.error(
"Error during roughpy signature computation",
exc_info=True,
extra={
"path_shape": path.shape,
"depth": depth,
"normalize": normalize,
"error": str(e),
},
)
# Re-raise as custom exception with context
raise SignatureCalculationError(
"Failed to compute path signature using roughpy",
{
"path_shape": path.shape,
"depth": depth,
"normalize": normalize,
"original_error": str(e),
},
) from e
[docs]
def calculate_path_signature(
time_values: NDArray[np.float64],
metric_values: NDArray[np.float64],
depth: int = 4,
normalize: bool = True,
) -> NDArray[np.float64]:
"""
Compute the signature of a path using the roughpy library.
Args:
time_values (numpy.ndarray): Array of time values (1D).
metric_values (numpy.ndarray): Array of metric values (1D).
depth (int): Truncation depth for signature computation. Defaults to 4.
normalize (bool): Whether to normalize the path before computing the signature.
Time is scaled to [0, 1]. Metric is min-max scaled to [0, 1].
Defaults to True.
Returns:
numpy.ndarray:
Flattened signature of the path. Returns an empty array if path length < 2.
"""
return _compute_signature_uncached(time_values, metric_values, depth, normalize)
[docs]
def calculate_path_signature_cached(
time_values: NDArray[np.float64],
metric_values: NDArray[np.float64],
depth: int = 4,
normalize: bool = True,
) -> NDArray[np.float64]:
"""
Compute signature with LRU caching for repeated calculations.
This function caches results based on the input arrays, making it
significantly faster when computing signatures for overlapping
sliding windows or repeated analyses.
Args:
time_values (numpy.ndarray): Array of time values (1D).
metric_values (numpy.ndarray): Array of metric values (1D).
depth (int): Truncation depth for signature computation. Defaults to 4.
normalize (bool): Whether to normalize the path. Defaults to True.
Returns:
numpy.ndarray: Flattened signature of the path.
Note:
Use clear_signature_cache() to clear the cache when memory
is a concern or between different analysis runs.
"""
global _cache_hits
if len(time_values) != len(metric_values):
raise ValueError("Time and metric arrays must have the same length.")
if len(time_values) < 2:
return np.array([])
# Convert to numpy arrays and ensure consistent dtype
time_arr = np.array(time_values, dtype=np.float64).flatten()
metric_arr = np.array(metric_values, dtype=np.float64).flatten()
# Create cache keys
time_hash = _array_to_cache_key(time_arr)
metric_hash = _array_to_cache_key(metric_arr)
# Check if this would be a cache hit (for statistics)
cache_info_before = _cached_signature_computation.cache_info()
# Call cached function
result_tuple = _cached_signature_computation(
time_hash,
metric_hash,
time_arr.tobytes(),
metric_arr.tobytes(),
len(time_arr),
depth,
normalize,
)
# Update hit counter if cache was hit
cache_info_after = _cached_signature_computation.cache_info()
if cache_info_after.hits > cache_info_before.hits:
_cache_hits += 1
return np.array(result_tuple, dtype=np.float64)
# Maintaining the original function signature for backward compatibility.
# The `method` parameter is kept for compatibility but is no longer used.
[docs]
def compute_signature(
time_values: NDArray[np.float64],
metric_values: NDArray[np.float64],
depth: int = 4,
normalize: bool = True,
method: Optional[str] = None,
) -> NDArray[np.float64]: # method is ignored
"""
Wrapper function to compute the signature of a 2D path (time vs. metric)
using the ``roughpy`` library.
This function constructs a 2D path from the provided time and metric values,
optionally normalizes it, and then computes its truncated signature up to
the specified depth. The signature includes all terms from level 0 to the specified depth.
Args:
time_values (numpy.ndarray): Array of time values (1D).
metric_values (numpy.ndarray): Array of metric values (1D).
depth (int, optional): The truncation depth for the signature computation.
Defaults to 4.
normalize (bool, optional): Whether to normalize the path data before
computing the signature. If True, time is scaled to [0, 1] and the
metric is min-max scaled to [0, 1]. Defaults to True.
method (Optional[str], optional): This argument is ignored and only
present for backward compatibility. The computation always uses
``roughpy``. Defaults to None.
Returns:
numpy.ndarray: A flattened numpy array representing the computed path
signature. Returns an empty array if the path has fewer than 2 points
or if an error occurs during computation.
"""
# Issue deprecation warning if method parameter is used
if method is not None:
warnings.warn(
"The 'method' parameter is deprecated and will be removed in v1.0.0. "
"The roughpy library is always used for signature calculations.",
DeprecationWarning,
stacklevel=2,
)
return calculate_path_signature(time_values, metric_values, depth, normalize)
# The old compute_deep_rough_paths_signature can be removed or aliased if strict backward compatibility
# for that specific name is needed externally, but it's not used internally by creative_fatigue_analysis.py
# For now, let's remove it to clean up.
# def compute_deep_rough_paths_signature(...):
# return compute_signature(...)