Source code for tssearch.distances.elastic_utils

import numpy as np
from numba import njit, prange
from tssearch.utils.preprocessing import standardization


@njit(parallel=True, fastmath=True)
def _cost_matrix(x, y):
    """

    Parameters
    ----------
    x: nd-array
        Time series x (query).
    y: nd-array
        Time series y.

    Returns
    -------
    c: nd-array
        The cost matrix.
    """
    l1 = x.shape[0]
    l2 = y.shape[0]
    c = np.zeros((l1, l2), dtype=np.float32)

    for i in prange(l1):
        for j in prange(l2):
            c[i, j] = (x[i] - y[j]) ** 2

    return c


@njit(parallel=True, fastmath=True)
def _multidimensional_cost_matrix(x, y, weight):
    """Helper function for fast computation of cost matrix in cost_matrix_diff_vec.
    Defined outside to prevent recompilation from numba

    Parameters
    ----------
    x: nd-array
        Time series x (query).
    y: nd-array
        Time series y.

    Returns
    -------
    c: nd-array
        The cost matrix.
    """
    l1 = x.shape[0]
    l2 = y.shape[0]
    l3 = x.shape[1]
    c = np.zeros((l1, l2), dtype=np.float32)

    for i in prange(l1):
        for j in prange(l2):
            dist = 0.0
            for di in range(l3):
                diff = x[i, di] - y[j, di]
                dist += weight[i, di] * (diff * diff)
            c[i, j] = dist ** 0.5

    return c


@njit(nogil=True, fastmath=True)
def _accumulated_cost_matrix(ac):
    """Fast computation of accumulated cost matrix using cost matrix.

    Parameters
    ----------
    ac: nd-array
        Given cost matrix c, ac = acc_initialization(...), ac[1:, 1:] = c.

    Returns
    -------
        The accumulated cost matrix.
    """
    for i in range(ac.shape[0] - 1):
        for j in range(ac.shape[1] - 1):
            ac[i + 1, j + 1] += min(ac[i, j + 1], ac[i + 1, j], ac[i, j])
    return ac


[docs]def acc_initialization(x, y, _type, tolerance=0): """Initializes the cost matrix according to the dtw type. Parameters ---------- x: nd-array Time series x (query). y: nd-array Time series y. _type: string Name of dtw type. tolerance: int Tolerance value. Returns ------- ac: nd-array The accumulated cost matrix. """ ac = np.zeros((x + 1, y + 1)) if _type == "dtw": ac[0, 1:] = np.inf ac[1:, 0] = np.inf elif _type == "oe-dtw": ac[0, 1:] = np.inf ac[1:, 0] = np.inf elif _type == "obe-dtw" or _type == "sub-dtw" or _type == "search": ac[1:, 0] = np.inf elif _type == "psi-dtw": ac[0, tolerance + 1 :] = np.inf ac[tolerance + 1 :, 0] = np.inf else: ac[0, 1:] = np.inf ac[1:, 0] = np.inf return ac
[docs]def cost_matrix(x, y, alpha=1, weight=None): """Computes cost matrix using a specified distance (dist) between two time series. Parameters ---------- x: nd-array Time series x (query). y: nd-array Time series y. dist: function The distance used as a local cost measure. None defaults to the squared euclidean distance. \**kwargs: See below: * *do_sign_norm* (``bool``) -- If ``True`` the signals will be normalized before computing the DTW, (default: ``False``) * *do_dist_norm* (``bool``) -- If ``True`` the DTW distance will be normalized by dividing the summation of the path dimension. (default: ``True``) * *window* (``String``) -- Selects the global window constrains. Available options are ``None`` and ``sakoe-chiba``. (default: ``None``) * *factor* (``Float``) -- Selects the global constrain factor. (default: ``min(xl, yl) * .50``) Returns ------- c: nd-array The cost matrix. """ if weight is None: weight = np.ones_like(x) if len(np.shape(weight)) == 1: weight = weight.reshape(-1, 1) if alpha == 1: C_d = 1 if len(np.shape(x)) == 1: C_n = (_cost_matrix(x, y) * weight) / np.max(weight) else: C_n = _multidimensional_cost_matrix(x, y, weight) else: # standardization parameters abs_norm = np.mean(x, axis=0), np.std(x, axis=0) diff_norm = np.mean(np.diff(x, axis=0), axis=0), np.std(np.diff(x, axis=0), axis=0) # Derivative calculation and standardization _x = standardization(np.diff(x, axis=0), param=diff_norm) _y = standardization(np.diff(y, axis=0), param=diff_norm) # same length of derivative x = standardization(x[:-1], param=abs_norm) y = standardization(y[:-1], param=abs_norm) weight = weight[:-1] if len(np.shape(x)) == 1: C_d = _cost_matrix(_x, _y) * weight C_n = _cost_matrix(x, y) * weight else: C_d = _multidimensional_cost_matrix(_x, _y, weight) C_n = _multidimensional_cost_matrix(x, y, weight) c = alpha * C_n + (1 - alpha) * C_d return c
[docs]def accumulated_cost_matrix(c, **kwargs): """ Parameters ---------- c: nd-array The cost matrix. \**kwargs: Returns ------- ac: nd-array The accumulated cost matrix. """ xl, yl = np.shape(c) window = kwargs.get("window", None) factor = kwargs.get("factor", np.min((xl, yl)) * 0.50) dtw_type = kwargs.get("dtw_type", "dtw") tolerance = kwargs.get("tolerance", 0) if window == "sakoe-chiba": c[np.abs(np.diff(np.indices(c.shape), axis=0))[0] > factor] = np.inf ac = acc_initialization(xl, yl, dtw_type, tolerance) ac[1:, 1:] = c.copy() ac = _accumulated_cost_matrix(ac)[1:, 1:] return ac
[docs]@njit(nogil=True, fastmath=True) def traceback(ac): """Computes the traceback path of the matrix c. Parameters ---------- ac: nd-array The accumulated cost matrix. Returns ------- Coordinates p and q of the minimum path. """ i, j = np.array(ac.shape) - 2 p, q = [i], [j] while (i > 0) and (j > 0): tb = 0 if ac[i, j + 1] < ac[i, j]: tb = 1 if ac[i + 1, j] < ac[i, j + tb]: tb = 2 if tb == 0: i -= 1 j -= 1 elif tb == 1: i -= 1 else: j -= 1 p.insert(0, i) q.insert(0, j) while j > 0: j -= 1 p.insert(0, i) q.insert(0, j) while i > 0: i -= 1 p.insert(0, i) q.insert(0, j) return np.array(p), np.array(q)
[docs]@njit(nogil=True, fastmath=True) def traceback_adj(ac): """Computes the adjusted traceback path of the matrix c. Parameters ---------- ac: nd-array The accumulated cost matrix. Returns ------- Coordinates p and q of the minimum path adjusted. """ i, j = np.array(ac.shape) - 2 p, q = [i], [j] while (i > 0) and (j > 0): tb = 0 if ac[i, j + 1] < ac[i, j]: tb = 1 if ac[i + 1, j] < ac[i, j + tb]: tb = 2 if tb == 0: i -= 1 j -= 1 elif tb == 1: i -= 1 else: # (tb == 2): j -= 1 p.insert(0, i) q.insert(0, j) while i > 0: i -= 1 p.insert(0, i) q.insert(0, j) return np.array(p), np.array(q)
[docs]def backtracking(ac): """Compute the most cost-efficient path. Parameters ---------- ac: nd-array The accumulated cost matrix. Returns ------- Coordinates of the most cost-efficient path. """ x = np.shape(ac) i = x[0] - 1 j = x[1] - 1 # The indices of the paths are save in opposite direction # path = np.ones((i + j, 2 )) * np.inf; best_path = [] steps = 0 while i != 0 or j != 0: best_path.append((i - 1, j - 1)) C = np.ones((3, 1)) * np.inf # Keep data points in both time series C[0] = ac[i - 1, j - 1] # Deletion in A C[1] = ac[i - 1, j] # Deletion in B C[2] = ac[i, j - 1] # Find the index for the lowest cost idx = np.argmin(C) if idx == 0: # Keep data points in both time series i = i - 1 j = j - 1 elif idx == 1: # Deletion in A i = i - 1 j = j else: # Deletion in B i = i j = j - 1 steps = steps + 1 best_path.append((i - 1, j - 1)) best_path.reverse() best_path = np.array(best_path[1:]) return best_path[:, 0], best_path[:, 1]
# DTW SW
[docs]def dtw_sw(x, y, winlen, alpha=0.5, **kwargs): """Computes Dynamic Time Warping (DTW) of two time series using a sliding window. TODO: Check if this needs to be speed up. Parameters ---------- x: nd-array Time series x (query). y: nd-array Time series y. winlen: int The sliding window length. alpha: float A factor between 0 and 1 which weights the amplitude and derivative contributions. A higher value will favor amplitude and a lower value will favor the first derivative. \**kwargs: See below: * *do_sign_norm* (``bool``) -- If ``True`` the signals will be normalized before computing the DTW, (default: ``False``) * *do_dist_norm* (``bool``) -- If ``True`` the DTW distance will be normalized by dividing the summation of the path dimension. (default: ``True``) * *window* (``String``) -- Selects the global window constrains. Available options are ``None`` and ``sakoe-chiba``. (default: ``None``) * *factor* (``Float``) -- Selects the global constrain factor. (default: ``min(xl, yl) * .50``) Returns ------- d: float The SW-DTW distance. c: nd-array The local cost matrix. ac: nd-array The accumulated cost matrix. path: nd-array The optimal warping path between the two sequences. """ xl, yl = len(x), len(y) do_sign_norm = kwargs.get("normalize", False) do_dist_norm = kwargs.get("dist_norm", True) window = kwargs.get("window", None) factor = kwargs.get("factor", np.min((xl, yl)) * 0.50) if do_sign_norm: x, y = standardization(x), standardization(y) ac = np.zeros((xl + 1, yl + 1)) ac[0, 1:] = np.inf ac[1:, 0] = np.inf tmp_ac = ac[1:, 1:] nx = get_mirror(x, winlen) ny = get_mirror(y, winlen) dnx = np.diff(nx) dny = np.diff(ny) nx = nx[:-1] ny = ny[:-1] # Workaround to deal with even window sizes if winlen % 2 == 0: winlen -= 1 swindow = np.hamming(winlen) swindow = swindow / np.sum(swindow) for i in range(xl): for j in range(yl): pad_i, pad_j = i + winlen, j + winlen # No window selected if window is None: tmp_ac[i, j] = sliding_dist( nx[pad_i - (winlen // 2) : pad_i + (winlen // 2) + 1], ny[pad_j - (winlen // 2) : pad_j + (winlen // 2) + 1], dnx[pad_i - (winlen // 2) : pad_i + (winlen // 2) + 1], dny[pad_j - (winlen // 2) : pad_j + (winlen // 2) + 1], alpha, swindow, ) # Sakoe-Chiba band elif window == "sakoe-chiba": if abs(i - j) < factor: tmp_ac[i, j] = sliding_dist( nx[pad_i - (winlen // 2) : pad_i + (winlen // 2) + 1], ny[pad_j - (winlen // 2) : pad_j + (winlen // 2) + 1], dnx[pad_i - (winlen // 2) : pad_i + (winlen // 2) + 1], dny[pad_j - (winlen // 2) : pad_j + (winlen // 2) + 1], alpha, swindow, ) else: tmp_ac[i, j] = np.inf # As last resource, the complete window is calculated else: tmp_ac[i, j] = sliding_dist( nx[pad_i - (winlen / 2) : pad_i + (winlen / 2) + 1], ny[pad_j - (winlen / 2) : pad_j + (winlen / 2) + 1], dnx[pad_i - (winlen / 2) : pad_i + (winlen / 2) + 1], dny[pad_j - (winlen / 2) : pad_j + (winlen / 2) + 1], alpha, swindow, ) c = tmp_ac.copy() for i in range(xl): for j in range(yl): tmp_ac[i, j] += min([ac[i, j], ac[i, j + 1], ac[i + 1, j]]) path = traceback(ac) if do_dist_norm: d = ac[-1, -1] / np.sum(np.shape(path)) else: d = ac[-1, -1] return d, c, ac, path
[docs]def sliding_dist(xw, yw, dxw, dyw, alpha, win): """Computes the sliding distance. Parameters ---------- xw: nd-array x coords window. yw: nd-array y coords window. dxw: nd-array x coords diff window. dyw: nd-array y coords diff window. alpha: float Rely more on absolute or difference values 1- abs, 0 - diff. win: nd-array Signal window used for sliding distance. Returns ------- Sliding distance """ return (1 - alpha) * np.sqrt(np.sum((((dxw - dyw) * win) ** 2.0))) + alpha * np.sqrt( np.sum((((xw - yw) * win) ** 2.0)) )
[docs]def get_mirror(s, ws): """Performs a signal windowing based on a double inversion from the start and end segments. Parameters ---------- s: nd-array the input-signal. ws: int window size. Returns ------- Signal windowed """ return np.r_[2 * s[0] - s[ws:0:-1], s, 2 * s[-1] - s[-2 : -ws - 2 : -1]]
@njit() def _lcss_point_dist(x, y): """ Parameters ---------- x: nd-array Time series x (query). y: nd-array Time series y. Returns ------- The LCSS distance. """ dist = 0.0 for di in range(x.shape[0]): diff = x[di] - y[di] dist += diff * diff return dist ** 0.5
[docs]def lcss_accumulated_matrix(x, y, eps): """Computes the LCSS cost matrix using the euclidean distance (dist) between two time series. Parameters ---------- x: nd-array Time series x (query). y: nd-array Time series y. eps : float Amplitude matching threshold. Returns ------- ac : nd-array The accumulated cost matrix. """ xl, yl = len(x), len(y) ac = np.zeros((xl + 1, yl + 1)) for i in range(1, xl + 1): for j in range(1, yl + 1): if _lcss_point_dist(x[i - 1, :], y[j - 1, :]) <= eps: ac[i, j] = 1 + ac[i - 1, j - 1] else: ac[i, j] = max(ac[i, j - 1], ac[i - 1, j]) return ac
[docs]def lcss_path(x, y, c, eps): """Computes the LCSS path between two time series. Parameters ---------- x: nd-array The reference signal. y: nd-array The estimated signal. c : nd-array The cost matrix. eps : float Matching threshold. Returns ------- Coordinates of the minimum LCSS path. """ i, j = len(x), len(y) path = [] while i > 0 and j > 0: if _lcss_point_dist(x[i - 1, :], y[j - 1, :]) <= eps: path.append((i - 1, j - 1)) i -= 1 j -= 1 elif c[i - 1, j] > c[i, j - 1]: i -= 1 else: j -= 1 path = np.array(path[::-1]) return path[1:, 0], path[1:, 1]
[docs]def lcss_score(c): """Computes the LCSS similarity score between two time series. Parameters ---------- c : nd-array The cost matrix. Returns ------- The LCSS score. """ xl = c.shape[0] - 1 yl = c.shape[1] - 1 return float(c[-1, -1]) / min([xl, yl])