Source code for clustviz.clara

# akalino GITHUB
# https://github.com/akalino/Clustering/blob/master/clara.py

import random
from typing import Tuple, Union, Dict, Any, Iterable

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from clustviz.utils import COLOR_DICT, annotate_points


[docs]class ClaraClustering: """ The clara clustering algorithm. Basically an iterative guessing version of k-medoids that makes things a lot faster for bigger data sets. """ def __init__(self, max_iter: int = 100_000): """ Class initialization. :param max_iter: The default number of max iterations. """ self.max_iter = max_iter self.dist_cache = dict()
[docs] def clara(self, _df: pd.DataFrame, _k: int, _fn: str) -> Tuple[float, list, Union[dict, Dict[Any, list]]]: """ The main clara clustering iterative algorithm. :param _df: Input dataframe. :param _k: Number of medoids. :param _fn: The distance function to use. :return: The minimized cost, the best medoid choices and the final configuration. """ size = len(_df) if size > 100_000: niter = 1_000 runs = 1 else: niter = self.max_iter runs = 5 # initialize min_avg_cost to infinity min_avg_cost = np.inf best_choices = [] best_results = {} for j in range(runs): # usually 5 times print("\n") print("run number: ", j) # take 40+_k*2 random indexes from input data sampling_idx = random.sample([i for i in range(size)], 40 + _k * 2) # take the corresponding rows from input dataframe _df prov_dic = {i: sampling_idx[i] for i in range(40 + _k * 2)} print(prov_dic) sampling_data = [] for idx in sampling_idx: sampling_data.append(_df.iloc[idx]) # create the sample dataframe sampled_df = pd.DataFrame(sampling_data, index=sampling_idx) # return total cost, medoids and clusters of sampled_df pre_cost, pre_choice, pre_medoids = self.k_medoids( sampled_df, _k, _fn, niter ) plot_pam_mod(sampled_df, pre_medoids, _df) print("RESULTS OF K-MEDOIDS") print("pre_cost: ", pre_cost) print("pre_choice: ", pre_choice) print("pre_medoids: ", pre_medoids) # compute average cost and clusters of whole input dataframe tmp_avg_cost, tmp_medoids = self.average_cost(_df, _fn, pre_choice) print("RESULTS OF WHOLE DATASET EVALUATION") print("tmp_avg_cost: ", tmp_avg_cost) print("tmp_medoids: ", tmp_medoids) # if the new cost is lower if tmp_avg_cost < min_avg_cost: print( "new_cost is lower, from {0} to {1}".format( round(min_avg_cost, 4), round(tmp_avg_cost, 4) ) ) min_avg_cost = tmp_avg_cost best_choices = list(pre_choice) best_results = dict(tmp_medoids) elif tmp_avg_cost == min_avg_cost: print("new_cost is equal") else: print("new_cost is higher") print("\n") print("FINAL RESULT:") plot_pam_mod(_df, best_results, _df) return min_avg_cost, best_choices, best_results
[docs] def k_medoids(self, _df: pd.DataFrame, _k: int, _fn: str, _niter: int) -> Tuple[float, list, Union[Dict[Any, list], dict]]: """ The original k-medoids algorithm. :param _df: Input data frame. :param _k: Number of medoids. :param _fn: The distance function to use. :param _niter: The number of iterations. :return: Cost of configuration, the medoids (list) and the clusters (dictionary). Pseudo-code for the k-medoids algorithm. 1. Sample k of the n data points as the medoids. 2. Associate each data point to the closest medoid. 3. While the cost of the data point space configuration is decreasing: - For each medoid m and each non-medoid point o: -- Swap m and o, recompute cost. -- If global cost increased, swap back. """ # Do some smarter setting of initial cost configuration _, medoids_sample = self.cheat_at_sampling(_df, _k, _fn, 17) print("initial medoids sample: ", medoids_sample) prior_cost, medoids = self.compute_cost(_df, _fn, medoids_sample) current_cost = prior_cost print("current_cost: ", current_cost) iter_count = 0 best_choices = [] best_results = {} # print('Running with {m} iterations'.format(m=_niter)) while iter_count < _niter: for m in medoids: clust_iter = 0 for item in medoids[m]: if item != m: idx = medoids_sample.index(m) swap_temp = medoids_sample[idx] medoids_sample[idx] = item tmp_cost, tmp_medoids = self.compute_cost( _df, _fn, medoids_sample, True ) if (tmp_cost < current_cost) & (clust_iter < 1): best_choices = list(medoids_sample) best_results = dict(tmp_medoids) current_cost = tmp_cost clust_iter += 1 else: best_choices = best_choices best_results = best_results current_cost = current_cost medoids_sample[idx] = swap_temp iter_count += 1 if best_choices == medoids_sample: print("Best configuration found! best_choices: ", best_choices) break if current_cost <= prior_cost: if current_cost < prior_cost: print( "Better configuration found! curr_cost:{0}, prior_cost:{1}".format( round(current_cost, 2), round(prior_cost, 2) ) ) else: print("Equal cost") prior_cost = current_cost medoids = best_results medoids_sample = best_choices print("new_medoids: ", best_choices) return current_cost, best_choices, best_results
[docs] def compute_cost(self, _df: pd.DataFrame, _fn: str, _cur_choice: list, cache_on: bool = False) -> Tuple[float, Dict[Any, list]]: """ A function to compute the configuration cost. :param _df: The input dataframe. :param _fn: The distance function. :param _cur_choice: The current set of medoid choices. :param cache_on: Binary flag to turn caching. :return: The total configuration cost, the medoids. """ total_cost = 0.0 medoids = {} for idx in _cur_choice: medoids[idx] = [] for i in list(_df.index): choice = -1 min_cost = np.inf for m in medoids: if cache_on: tmp = self.dist_cache.get((m, i), None) if not cache_on or tmp is None: if _fn == "manhattan": tmp = self.manhattan_distance(_df.loc[m], _df.loc[i]) elif _fn == "cosine": tmp = self.cosine_distance(_df.loc[m], _df.loc[i]) elif _fn == "euclidean": tmp = self.euclidean_distance(_df.loc[m], _df.loc[i]) elif _fn == "fast_euclidean": tmp = self.fast_euclidean(_df.loc[m], _df.loc[i]) else: print( "You need to input a valid distance function (manhattan, cosine, euclidean or " "fast_euclidean)." ) if cache_on: self.dist_cache[(m, i)] = tmp if tmp < min_cost: choice = m min_cost = tmp medoids[choice].append(i) total_cost += min_cost # print("total_cost: ", total_cost) return total_cost, medoids
[docs] def average_cost(self, _df: pd.DataFrame, _fn: str, _cur_choice: list): """ A function to compute the average cost. :param _df: The input data frame. :param _fn: The distance function. :param _cur_choice: The current medoid candidates. :return: The average cost, the new medoids. """ _tc, _m = self.compute_cost(_df, _fn, _cur_choice) avg_cost = _tc / len(_m) return avg_cost, _m
[docs] def cheat_at_sampling(self, _df: pd.DataFrame, _k: int, _fn: str, _nsamp: int) -> Tuple[float, list]: """ A function to cheat at sampling for speed ups. :param _df: The input dataframe. :param _k: The number of medoids. :param _fn: The distance function. :param _nsamp: The number of samples. :return: The best score, the medoids. """ # this function tries _nsamp different configurations of initial medoids and chooses the one with the lowest # cost print("cheating at sampling") score_holder = [] medoid_holder = [] for _ in range(_nsamp): # 17 by default # take _k random points as medoids_sample medoids_sample = random.sample(list(_df.index), _k) # compute cost and medoids with this medoids_sample prior_cost, medoids = self.compute_cost( _df, _fn, medoids_sample, True ) # store the cost and medoids score_holder.append(prior_cost) medoid_holder.append(medoids) # take the minimum cost and the corresponding medoids idx = score_holder.index(min(score_holder)) ms = medoid_holder[idx].keys() return score_holder[idx], list(ms)
[docs] @staticmethod def euclidean_distance(v1: Iterable, v2: Iterable) -> float: """ Slow function for computing euclidean distance. :param v1: The first vector. :param v2: The second vector. :return: The euclidean distance between v1 and v2. """ dist = 0 for a1, a2 in zip(v1, v2): dist += abs(a1 - a2) ** 2 return dist
[docs] @staticmethod def fast_euclidean(v1: np.ndarray, v2: np.ndarray) -> float: """ Faster function for euclidean distance. :param v1: The first vector. :param v2: The second vector. :return: The euclidean distance between v1 and v2. """ return np.linalg.norm(v1 - v2)
[docs] @staticmethod def manhattan_distance(v1: Iterable, v2: Iterable) -> float: """ Function for manhattan distance. :param v1: The first vector. :param v2: The second vector. :return: The manhattan distance between v1 and v2. """ dist = 0 for a1, a2 in zip(v1, v2): dist += abs(a1 - a2) return dist
[docs] @staticmethod def cosine_distance(v1: Iterable, v2: Iterable) -> float: """ Function for cosine distance. :param v1: The first vector. :param v2: The second vector. :return: The cosine distance between v1 and v2. """ xx, yy, xy = 0, 0, 0 for a1, a2 in zip(v1, v2): xx += a1 * a1 yy += a2 * a2 xy += a1 * a2 return float(xy) / np.sqrt(xx * yy)
[docs]def plot_pam_mod(data: pd.DataFrame, cl: dict, full: pd.DataFrame, equal_axis_scale: bool = False) -> None: """ Scatterplot of data points, with colors according to cluster labels. Only sampled points are plotted, the others are only displayed with their indexes; moreover, centers of mass of the clusters are marked with an X. :param data: input data sample. :param cl: cluster dictionary. :param full: full input dataframe. :param equal_axis_scale: if True, axis are plotted with the same scaling. """ _, ax = plt.subplots(figsize=(14, 6)) # just as placeholder, it actually doesnt plot anything because points are white with white edgecolor plt.scatter( full.iloc[:, 0], full.iloc[:, 1], s=300, color="white", edgecolor="white", ) # plot the sampled point, with colors according to the cluster they belong to for i, el in enumerate(cl.values()): plt.scatter( data.loc[el, 0], data.loc[el, 1], s=300, color=COLOR_DICT[i % len(COLOR_DICT)], edgecolor="black", ) # plot centers of mass, marked with an X for i, el in enumerate(cl.keys()): plt.scatter( data.loc[el, 0], data.loc[el, 1], s=500, color="red", marker="X", edgecolor="black", ) # plot indexes of points in plot annotate_points(annotations=range(len(full)), points=np.array(full), ax=ax) if equal_axis_scale is True: ax.set_aspect("equal", adjustable="box") plt.show()