Source code for patterns

import numpy as np
import pandas as pd
import matrixprofile
from stumpy import stump, stumped, mstump, mstumped, fluss

[docs]def pick_subspace_columns(df, mps, idx, k, m, include): """ Given a multi-dimensional time series as a pandas Dataframe, keep only the columns that have been used for the creation of the k-dimensional matrix profile. Args: df: The DataFrame that contains the multidimensional time series. mps: The multi-dimensional matrix profile. Each row of the array corresponds to each matrix profile for a given dimension (i.e., the first row is the 1-D matrix profile and the second row is the 2-D matrix profile). idx: The multi-dimensional matrix profile index where each row of the array corresponds to each matrix profile index for a given dimension. k: If mps and idx are one-dimensional k can be used to specify the given dimension of the matrix profile. The default value specifies the 1-D matrix profile. If mps and idx are multi-dimensional, k is ignored. m: The subsequence window size. Should be the same as the one used to create the multidimensional matrix profile that is the input. include: A list of the column names that must be included in the constrained multidimensional motif search. Return: The list of subspace columns """ motifs_idx = np.argsort(mps, axis=1)[:, :2] col_indexes = [] for n in include: col_indexes.append(df.columns.get_loc(n)) print(f'Include dimensions: {include}, indexes in df = {col_indexes}') S = subspace(df, m, motifs_idx[k][0], idx[k][motifs_idx[k][0]], k, include=col_indexes) print(f"For k = {k}, the {k + 1}-dimensional subspace includes subsequences from {df.columns[S].values}") subspace_cols = list(df.columns[S].values) return subspace_cols
[docs]def to_mpf(mp, index, window, ts): """ Using a matrix profile, a matrix profile index, the window size and the timeseries used to calculate the previous, create a matrix profile object that is compatible with the matrix profile foundation library (https://github.com/matrix-profile-foundation/matrixprofile). This is useful for cases where another library was used to generate the matrix profile. Args: mp: A matrix profile. index: The matrix profile index that accompanies the matrix profile. window: The subsequence window size. ts: The timeseries that was used to calculate the matrix profile. Return: The Matrixprofile structure """ mp_mpf = matrixprofile.utils.empty_mp() mp_mpf['mp'] = np.array(mp) mp_mpf['pi'] = np.array(index) mp_mpf['metric'] = 'euclidean' mp_mpf['w'] = window mp_mpf['ez'] = 0 mp_mpf['join'] = False mp_mpf['sample_pct'] = 1 mp_mpf['data']['ts'] = np.array(ts).astype('d') mp_mpf['algorithm']='mpx' return mp_mpf
[docs]def compute_mp_av(mp, index, m, df, k): """ Given a matrix profile, a matrix profile index, the window size and the DataFrame that contains the timeseries. Create a matrix profile object and add the corrected matrix profile after applying the complexity av. Uses an extended version of the apply_av function from matrixprofile foundation that is compatible with multi-dimensional timeseries. The implementation can be found here (https://github.com/MORE-EU/matrixprofile/blob/master/matrixprofile/transform.py) Args: mp: A matrix profile. index: The matrix profile index that accompanies the matrix profile. window: The subsequence window size. ts: The timeseries that was used to calculate the matrix profile. Return: Updated profile with an annotation vector """ # Apply the annotation vector m = m # window size mp = np.nan_to_num(mp, np.nanmax(mp)) # remove nan values profile = to_mpf(mp, index, m, df) av_type = 'complexity' profile = mpf.transform.apply_av(profile, av_type) return profile
[docs]def pattern_loc(start, end, mask, segment_labels): """ Considering that a time series is characterized by regions belonging to two different labels. Args: start: The starting index of the pattern. end: The ending index of the pattern. mask: Binary mask used to annotate the time series. segment_labels: List of the two labels that characterize the time series. Return: The label name of the region that the pattern is contained in. """ if len(segment_labels) != 2: raise ValueError('segment_labels must contain exactly 2 labels') start = start end = end # the first label in the list will be assigned to for the True regions in the mask true_label = segment_labels[0] # the second label in the list will be assigned to for the False regions in the mask false_label = segment_labels[1] if mask[start] == mask[end]: if mask[start] == True: loc = true_label else: loc = false_label else: # if a pattern spans both regions return the label 'both' loc = 'both' return loc
[docs]def calc_cost(cl1_len, cl2_len, num_cl1, num_cl2): """ Assign a cost to a pattern based on if the majority of its occurances are observed in regions of a time series that are annotated with the same binary label. The cost calculation takes into account a possible difference in the total lengths of the segments. Args: cl1_len: Total length of the time series that belong to the class 1. cl2_len: Total length of the time series that belong to the class 2. num_cl1: Number of occurances of the pattern in regions that belong to cl1. num_cl2: Number of occurances of the pattern in regions that belong to cl2. Return: The label name of the region that the pattern is contained in, as well as the normalized number of occurences. """ if (num_cl1 + num_cl2 <= 2): return 1.0, None, None if (cl1_len == 0 or cl2_len == 0): return 1.0, None, None f = cl1_len / cl2_len norm_cl1 = num_cl1 / f norm_cl2 = num_cl2 cost = 1 - (abs(norm_cl1 - norm_cl2 ) / (norm_cl1 + norm_cl2)) return cost, norm_cl1, norm_cl2
[docs]def calculate_motif_stats(p, mask, k, m, ez, radius, segment_labels): """ Calculate some useful statistics for the motifs found. Args: p: A profile object as it is defined in the matrixprofile foundation python library. mask: Binary mask used to annotate the time series. m: The window size (length of the motif). ez: The exclusion zone used. radius: The radius that has been used in the experiment. segment_labels: List of the two labels that characterize the time series. Return: List of the statistics """ if len(segment_labels) != 2: raise ValueError('segment_labels must contain exactly 2 labels') # the first label in the list will be assigned to for the True regions in the mask true_label = segment_labels[0] # the second label in the list will be assigned to for the False regions in the mask false_label = segment_labels[1] output_list = [] cls1_len = np.count_nonzero(mask) cls2_len = abs(mask.shape[0] - cls1_len) for i in range(0, len(p['motifs'])): idx, nn1 = p['motifs'][i]['motifs'] neighbors = p['motifs'][i]['neighbors'] motif_pair = p['motifs'][i]['motifs'] start = idx end = idx + m - 1 nn_idx_start = [] nn_idx_end = [] for neighbor in neighbors: nn_idx_start.append(neighbor) nn_idx_end.append(neighbor + m - 1) cls1_count = 0 cls2_count = 0 spanning_both = 0 for nn_start, nn_end in zip(nn_idx_start, nn_idx_end): location_in_ts = pattern_loc(nn_start, nn_end, mask, segment_labels) if location_in_ts == true_label: cls1_count += 1 elif location_in_ts == false_label: cls2_count += 1 else: spanning_both += 1 motif_location = pattern_loc(start, end, mask, segment_labels) if motif_location == true_label: cls1_count += 1 elif motif_location == false_label: cls2_count += 1 nearest_neighbor_location = pattern_loc(nn1, nn1+m-1, mask, segment_labels) if motif_location == true_label: cls1_count += 1 elif motif_location == false_label: cls2_count += 1 cost, norm_cls1, norm_cls2 = calc_cost(cls1_len, cls2_len, cls1_count, cls2_count) maj = '' if norm_cls1 == norm_cls2: maj = 'None' elif norm_cls1 is None and norm_cls2 is None: maj = 'None' elif norm_cls1 > norm_cls2: maj = true_label elif norm_cls1 < norm_cls2: maj = false_label output_list.append([i+1, motif_location, nearest_neighbor_location, cls1_count, cls2_count, cost, m, ez, radius, motif_pair, maj]) return output_list def calculate_nn_stats(nn, mask, m, ez, segment_labels, maj_other): """ Calculate some useful statistics for a pattern based on its nearest neighbors. That pattern is supposed to be found in another time series and is examined based on its neighbors on the current time series. Args: nn: The indices of the nearest neighbors in the time series at hand. mask: Binary mask used to annotate the time series at hand. m: The window size (length of the motif). ez: The exclusion zone used. segment_labels: List of the two labels that characterize the time series. maj_other: The labels of the majority of neighbors the pattern had in the initial time series it was extracted from. Return: """ if len(segment_labels) != 2: raise ValueError('segment_labels must contain exactly 2 labels') # the first label in the list will be assigned to for the True regions in the mask true_label = segment_labels[0] # the second label in the list will be assigned to for the False regions in the mask false_label = segment_labels[1] cls1_len = np.count_nonzero(mask) cls2_len = abs(mask.shape[0] - cls1_len) neighbors = nn nn_idx_start = [] nn_idx_end = [] for neighbor in neighbors: nn_idx_start.append(neighbor) nn_idx_end.append(neighbor + m - 1) cls1_count = 0 cls2_count = 0 spanning_both = 0 for nn_start, nn_end in zip(nn_idx_start, nn_idx_end): location_in_ts = pattern_loc(nn_start, nn_end, mask, segment_labels) if location_in_ts == true_label: cls1_count += 1 elif location_in_ts == false_label: cls2_count += 1 else: spanning_both += 1 cost, norm_cls1, norm_cls2 = calc_cost(cls1_len, cls2_len, cls1_count, cls2_count) maj = '' if norm_cls1 == norm_cls2: maj = 'None' elif norm_cls1 is None and norm_cls2 is None: maj = 'None' elif norm_cls1 > norm_cls2: maj = true_label elif norm_cls1 < norm_cls2: maj = false_label matching_maj = (maj_other == maj) return [nn, cls1_count, cls2_count, ez, cost, matching_maj] def create_mp(df, motif_len, column, path, dask=True): """ Create and Save a univariate/multidimensional matrix profile as a pair of npz files. Input is based on the output of (https://stumpy.readthedocs.io/en/latest/api.html#mstump) Args: df: The DataFrame that contains the multidimensional time series. motif_len: The subsequence window size. columns: A list of the column indexes that are included in the comptutation univariate/multidimensional profile. path: Path of the directory where the file will be saved. dask: A Dask Distributed client that is connected to a Dask scheduler and Dask workers Return: Matrix profile distances, matrix profile indexes """ column1=str(column) if len(column1)<2: if dask==True: from dask.distributed import Client, LocalCluster with Client(scheduler_port=8782, dashboard_address=None, processes=False, n_workers=4, threads_per_worker=2, memory_limit='50GB') as dask_client: mps=stumped(dask_client, df.iloc[:,column], motif_len)# Note that a dask client is needed if(path): np.savez_compressed(path,mp=mps[:,0],mpi=mps[:,1] ) print('Univariate with Dask') return mps[:,0],mps[:,1] mps = stump(df.iloc[:,column], motif_len) if(path): np.savez_compressed(path, mp=mps[:,0],mpi=mps[:,1]) print('Uvivariate without Dask') return mps[:,0],mps[:,1] else: if dask==True: from dask.distributed import Client, LocalCluster with Client(scheduler_port=8782, dashboard_address=None, processes=False, n_workers=4, threads_per_worker=2, memory_limit='50GB') as dask_client: mps,indices = mstumped(dask_client, df.iloc[:,column], motif_len) # Note that a dask client needed if(path): np.savez_compressed(path, mp=mps, mpi=indices) print('Multivariate with Dask') return mps, indices mps,indices = mstump(df.iloc[:,column], motif_len) if(path): np.savez_compressed(path, mp=mps, mpi=indices) print('Multivariate without Dask') return mps, indices def change_points(mpi, L, path, excl_factor=5, change_points=4): """ Calculation of total change points(segments) we want to divide our region with respect to a computed Univariate Matrix Profile. This procedure is illustated through the Fluss Algorithm (https://stumpy.readthedocs.io/en/latest/_modules/stumpy/floss.html#fluss). We input a L which is a list of integers. The L is a factor which excludes change point detection. It replaces the Arc Curve with 1 depending of the size of L multiplied with an exclusion Factor (excl_factor). This algorithm can work for a multidimensional DataFrames. User need just to specify the column in mpi. eg mpi[3] so we look for change_points in the 3rd column. In return we provide the locations(indexes) of change_points and the arc-curve which are contained in a specific L. Args: mpi: The one-dimensional matrix profile index where the array corresponds to the matrix profile index for a given dimension. L: The subsequence length that is set roughly to be one period length. This is likely to be the same value as the motif_len, used to compute the matrix profile and matrix profile index. excl_factor: The multiplying factor for the regime exclusion zone. change_points: Number of segments that our space is going to be divided. path: Path of the directory where the file will be saved. Return: The locations(indexes) of change_points and the arc-curve which are contained in a specific L. """ regimes = [change_points] output = dict() print("Computing regimes..") for l in tqdm(L): output[l] = [fluss(mpi, L=int(l), n_regimes=int(r), excl_factor=excl_factor) for r in regimes] if(path): np.save(path, output) print("Done") return output def change_points_md(mpi, k_optimal, path, L=None, change_points=4, excl_factor=5): """ Calculation of total change points(segments) we want to divide our region with respect to a computed Multivariate Matrix Profile.This procedure is illustated through the Modified Fluss Algorithm (https://stumpy.readthedocs.io/en/latest/_modules/stumpy/floss.html#fluss). We input a L which is a list of integers. The L is a factor which excludes change point detection. It replaces the Arc Curve with 1 depending of the size of L multiplied with an exclusion Factor (excl_factor). It alterates because we built it through optimal dimensions given from elbow_method. So in the end we will receive locations(indexes) of change_points and the arc-curve which are contained in a specific L for each column/dimension. Args: mpi: The multi-dimensional matrix profile index where the array corresponds to the matrix profile index for a given dimension. k_optimal: Choose optimal dimension(s) given from the elbow method. Or all of the DataFrame Dimension L: The subsequence length that is set roughly to be one period length. This is likely to be the same value as the motif_len,used to compute the matrix profile and matrix profile index. change_points: Number of segments that our space is going to be divided. excl_factor: The multiplying factor for the regime exclusion zone. path: Path of the directory where the file will be saved. Return: The locations(indexes) of change_points and the arc-curve which are contained in a specific L for each dimension of the DataFrame """ no_cols = np.arange(1, k_optimal + 1, 1) if(L == None): L = np.arange(1000,50000, 1000).astype(int) regimes = [change_points] output = dict() for c in tqdm(no_cols): output[c]= dict() for l in L: output[c][l] = [fluss(mpi[c - 1], L=int(l), n_regimes=int(r), excl_factor=excl_factor) for r in regimes] if(path): np.save(path, output) return output