Source code for delaynet.detrending_methods.z_score

"""Z-Score (ZS) detrending."""

from numba import prange, njit, int64
from numpy import mean as npmean, mod, std, ndarray, arange, integer, copy, zeros

from ..decorators import detrending_method

from ..utils.logging import logging


[docs] @detrending_method def z_score(ts: ndarray, periodicity: int = 1, max_periods: int = -1) -> ndarray: r"""Z-Score (ZS) detrending. The Z-Score (ZS) detrending is a standard score that measures the number of standard deviations a data point is from the mean. It is calculated as: .. math:: Z = \frac{X - \mu}{\sigma} where :math:`X` is the data point, :math:`\mu` is the mean, and :math:`\sigma` is the standard deviation. The Z-Score detrending is applied to a time series by computing the mean and standard deviation of a sub time series around each data point. The sub time series is defined by the periodicity and the maximum number of periods to consider before and after the current value. Mean and standard deviation are computed for each sub time series and used to detrend the data point. The current value is removed from the sub time series to avoid bias. When ``periodicity > 1``, the function compares each point with other points at the same phase of the cycle (same position within each period). When ``periodicity == 1``: - If ``max_periods == -1``, a simple Z-score is applied to the entire time series. - If ``max_periods != -1``, a moving window approach is used, where each point is compared with ``max_periods`` points before and after it. For a valid Z-Score, :math:`2\times\texttt{periodicity}+1 \leq \texttt{len(ts)}` needs to be satisfied. Also, :math:`\texttt{max_periods} \times \texttt{periodicity} \geq\texttt{len(ts)}` results in including all periods. :param ts: Time series to detrend. :type ts: numpy.ndarray :param periodicity: Periodicity of the time series - reoccurrence of the same pattern. :type periodicity: int > 0 :param max_periods: Maximum number of periods to consider before and after the current value. If -1, all periods are considered. :type max_periods: int >= -1 :return: Detrended time series. :rtype: numpy.ndarray :raises ValueError: If the ``periodicity`` is not a positive integer. :raises ValueError: If the ``max_periods`` is not a positive integer or -1. :raises ValueError: If :math:`2\times\texttt{periodicity}+1 \leq \texttt{len(ts)}` is not satisfied. """ if not isinstance(periodicity, (int, integer)) or periodicity <= 0: raise ValueError(f"periodicity must be a positive integer, not {periodicity}.") if not isinstance(max_periods, (int, integer)) or max_periods < -1: raise ValueError( f"`max_periods` must be a positive integer or -1, not {max_periods}." ) if 2 * periodicity + 1 > ts.size: raise ValueError( f"Periodicity ({periodicity}) is too large for the time series of length " f"{ts.size}. Need 2*periodicity+1 <= len(ts)." ) # If max_periods * periodicity + 1 >= len(ts), then max_periods = -1 if max_periods * periodicity + 1 >= ts.size: logging.debug( "max_periods (%s) is larger than or equal to the " "available periods (%s). Setting max_periods to -1 (all periods).", max_periods, ts.size / periodicity, ) max_periods = -1 if periodicity == 1 and max_periods == -1: # Simple case, no slicing needed ts_std = std(ts) if ts_std == 0: logging.warning( "Standard deviation of the whole time series is 0, " "returning a zero array of the same size." ) return zeros(ts.size) return (ts - npmean(ts)) / ts_std ts2 = ( _z_score_loop_all(periodicity, ts) if max_periods == -1 else _z_score_loop_partial(periodicity, max_periods, ts) ) return ts2
@njit(parallel=True) def _z_score_loop_all(periodicity, ts): # pragma: no cover ts2 = copy(ts) for k in prange(ts.size): sub_ts = _get_sub_ts_all_periods(ts, k, periodicity) st_dev = std(sub_ts) if st_dev > 0: ts2[k] = (ts[k] - npmean(sub_ts)) / st_dev else: # When std is 0, all points at the same phase have the same value # The detrended value should be 0 (point is exactly at the mean) ts2[k] = 0.0 return ts2 @njit(parallel=True) def _z_score_loop_partial(periodicity, max_periods, ts): # pragma: no cover ts2 = copy(ts) for k in prange(ts.size): sub_ts = _get_sub_ts_partial(ts, k, periodicity, max_periods) st_dev = std(sub_ts) if st_dev > 0: ts2[k] = (ts[k] - npmean(sub_ts)) / st_dev else: # When std is 0, all points at the same phase have the same value # The detrended value should be 0 (point is exactly at the mean) ts2[k] = 0.0 return ts2 @njit def _get_sub_ts_all_periods(ts, k, periodicity): # pragma: no cover """Get the sub time series for all periods.""" # Get the phase of the current point phase = mod(k, periodicity) # Get all indices with the same phase indices = arange(phase, len(ts), periodicity, dtype=int64) # Remove the current index indices = indices[indices != k] return ts[indices] @njit def _get_sub_ts_partial(ts, k, periodicity, max_periods): # pragma: no cover """Get the sub time series for a limited number of periods.""" remainder = mod(k, periodicity) start_index = max(remainder, k - (max_periods * periodicity)) end_index = min(len(ts) - remainder, k + ((max_periods + 1) * periodicity)) indices = arange(start_index, end_index, periodicity, dtype=int64) # Remove the current index indices = indices[indices != k] return ts[indices]