Source code for clustviz.pam

# SachinKalsi GITHUB
# https://github.com/SachinKalsi/kmedoids/blob/master/KMedoids.py
from typing import Optional

from scipy.sparse import csr_matrix
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import pprint
from clustviz.utils import COLOR_DICT, annotate_points


[docs]class KMedoids: def __init__( self, n_cluster: int = 2, max_iter: int = 10, tol: float = 0.1, start_prob: float = 0.8, end_prob: float = 0.99, random_state: Optional[int] = 42, ): """Kmedoids constructor called.""" if ( start_prob < 0 or start_prob >= 1 or end_prob < 0 or end_prob >= 1 or start_prob > end_prob ): raise ValueError("Invalid input") self.n_cluster = n_cluster self.max_iter = max_iter self.tol = tol self.start_prob = start_prob self.end_prob = end_prob self.medoids = [] # empty medois self.clusters = {} # empty clusters self.tol_reached = float("inf") self.current_distance = 0 self.__data = None self.__is_csr = None self.__rows = 0 self.__columns = 0 self.cluster_distances = {} self.__random_state = random_state
[docs] def fit(self, data): print("fitting") self.__data = data self.__set_data_type() self.__start_algo()
def __start_algo(self): print("starting algo") self.__initialize_medoids() # choosing initial medoids # computing clusters and cluster_distances self.clusters, self.cluster_distances = self.__calculate_clusters( self.medoids ) # print cluster and cluster_distances print("clusters: ") pprint.PrettyPrinter(indent=2, compact=True).pprint(self.clusters) print("clusters_distances: ", self.cluster_distances) plot_pam(self.__data, self.clusters) self.__update_clusters() def __update_clusters(self): for i in range( self.max_iter ): # to stop if convergence isn't reached whithin max_iter iterations print("\n") print("iteration n°: ", i) # compute distance obtained by swapping medoids in the clusters cluster_dist_with_new_medoids = (self.__swap_and_recalculate_clusters()) # if the new sum of cluster_distances is smaller than the old one if ( self.__is_new_cluster_dist_small(cluster_dist_with_new_medoids) is True ): print("new is smaller") # compute clusters and cluster_distance with new medoids ( self.clusters, self.cluster_distances, ) = self.__calculate_clusters(self.medoids) print("clusters: ") pprint.PrettyPrinter(indent=2, compact=True).pprint( self.clusters ) plot_pam(self.__data, self.clusters) # print("clusters_distances: ", self.cluster_distances) else: # if the sum of cluster_distances doesn't improve, terminate the algorithm print("termination") break def __is_new_cluster_dist_small(self, cluster_dist_with_new_medoids): """ Return True if the new sum of cluster_distances is smaller than the previous one, and update the medoids, else return False. :param cluster_dist_with_new_medoids: cluster distances with new medoids. :return: True if the new sum of cluster_distances is smaller than the previous one, else False. """ # compute the existing sum of cluster_distances existance_dist = self.calculate_distance_of_clusters() print("present dist: ", existance_dist) # computes the new sum of cluster_distances new_dist = self.calculate_distance_of_clusters( cluster_dist_with_new_medoids ) print("new dist: ", new_dist) # if it is better, substitute the old medoids with the new ones and return True, else return False if ( existance_dist > new_dist and (existance_dist - new_dist) > self.tol ): self.medoids = cluster_dist_with_new_medoids.keys() return True return False
[docs] def calculate_distance_of_clusters(self, cluster_dist=None): """ If no argument is provided, just sum the distances of the existing cluster_distances, else sum the distances of the input cluster_distances. :param cluster_dist: if not None, cluster distances. :return: sum of cluster distances. """ if cluster_dist is None: cluster_dist = self.cluster_distances dist = 0 for medoid in cluster_dist.keys(): dist += cluster_dist[medoid] return dist
def __swap_and_recalculate_clusters(self): # http://www.math.le.ac.uk/people/ag153/homepage/KmeansKmedoids/Kmeans_Kmedoids.html """Return dictionary of new cluster_distances obtained by swapping medoids in each cluster.""" print("swap and recompute") cluster_dist = {} for medoid in self.medoids: # for each medoid is_shortest_medoid_found = False for data_index in self.clusters[ medoid ]: # for each point in the current medoid's cluster if data_index != medoid: # exclude the medoid itself # create a list of the elements of the cluster cluster_list = list(self.clusters[medoid]) # make the current point the temporary medoid cluster_list[ self.clusters[medoid].index(data_index) ] = medoid # compute new cluster distance obtained by swapping the medoid new_distance = self.calculate_inter_cluster_distance( data_index, cluster_list ) # if this new distance is smaller than the previous one if new_distance < self.cluster_distances[medoid]: print("new better medoid: ", data_index) cluster_dist[data_index] = new_distance is_shortest_medoid_found = True break # exit for loop for this medoid, since a better one has been found # if no better medoid has been found, keep the current one if is_shortest_medoid_found is False: print("no better medoids found, keep: ", medoid) cluster_dist[medoid] = self.cluster_distances[medoid] print("cluster_dist: ", cluster_dist) return cluster_dist
[docs] def calculate_inter_cluster_distance(self, medoid, cluster_list): """Compute the average distance of points in a cluster from their medoid.""" distance = 0 for data_index in cluster_list: distance += self.__get_distance(medoid, data_index) return distance / len(cluster_list)
def __calculate_clusters(self, medoids): """Return the clusters and the relative distances (average distance of each element of the cluster from the medoid).""" clusters = ( {} ) # it will be of the form {medoid0: [elements of cluster0], medoid1: [elements of cluster1], ...} cluster_distances = {} # initialize empty clusters and cluster_distances for medoid in medoids: clusters[medoid] = [] cluster_distances[medoid] = 0 for row in range(self.__rows): # for every row of input data # compute nearest medoid and relative distance from row ( nearest_medoid, nearest_distance, ) = self.__get_shortest_distance_to_medoid(row, medoids) # add this distance to the distances relative to the nearest_medoid cluster cluster_distances[nearest_medoid] += nearest_distance # add the row to the nearest_medoid cluster clusters[nearest_medoid].append(row) # divide each cluster_distance for the number of element in its corresponding cluster, to obtain the average # distance for medoid in medoids: cluster_distances[medoid] /= len(clusters[medoid]) return clusters, cluster_distances def __get_shortest_distance_to_medoid(self, row_index, medoids): """Return closest medoid and relative distance from the input row (point).""" min_distance = float("inf") current_medoid = None for medoid in medoids: current_distance = self.__get_distance( medoid, row_index ) # compute distance from input row to medoid if ( current_distance < min_distance ): # if it happens to be shorter than all previously computed distances min_distance = current_distance # save it as min_distance current_medoid = ( medoid # choose this medoid as the closest one ) return current_medoid, min_distance def __initialize_medoids(self): """Kmeans++ initialisation.""" print("initializing medoids with kmeans++") random.seed(self.__random_state) self.medoids.append( random.randint(0, self.__rows - 1) ) # choosing a random row from data while ( len(self.medoids) != self.n_cluster ): # until the number of medoids reaches the number of clusters self.medoids.append( self.__find_distant_medoid() ) # choose as next medoid the most distant from the previously chosen ones def __find_distant_medoid(self): """Return a row corresponding to a point which is considerably distant from its closest medoid.""" distances = [] indices = [] for row in range(self.__rows): # for every row in data indices.append(row) distances.append( self.__get_shortest_distance_to_medoid(row, self.medoids)[1] ) # shortest distance from row to its closest medoid distances_index = np.argsort( distances ) # the sorted indices of the distances choosen_dist = self.__select_distant_medoid( distances_index ) # the index corresponding to the distance chosen return indices[ choosen_dist ] # row corresponding to the chosen distance def __select_distant_medoid(self, distances_index): """Return a random index of the distances_index between start and end.""" start_index = round( self.start_prob * len(distances_index) ) # normally 0.8*len(dist) end_index = round( self.end_prob * (len(distances_index) - 1) ) # normally 0.99*len(dist) # returns a random index corresponding to a row which is distant from its closest medoid, but not necessarily # the row with the maximum distance from its medoid return distances_index[random.randint(start_index, end_index)] def __get_distance(self, x1, x2): """Compute euclidean distance, with an initial transformation based on input data.""" a = ( self.__data[x1].toarray() if self.__is_csr is True else np.array(self.__data[x1]) ) b = ( self.__data[x2].toarray() if self.__is_csr is True else np.array(self.__data[x2]) ) return np.linalg.norm(a - b) def __set_data_type(self): """To check whether the given input is of type list or csr.""" print("setting data type") if isinstance(self.__data, csr_matrix): self.__is_csr = True self.__rows = self.__data.shape[0] self.__columns = self.__data.shape[1] elif isinstance(self.__data, list): self.__is_csr = False self.__rows = len(self.__data) self.__columns = len(self.__data[0]) else: raise ValueError("Invalid input")
[docs]def plot_pam(data: pd.DataFrame, cl: dict, equal_axis_scale: bool = False) -> None: """ Scatterplot of data points, with colors according to cluster labels. Centers of mass of the clusters are marked with an X. :param data: input data sample as dataframe. :param cl: cluster dictionary. :param equal_axis_scale: if True, axis are plotted with the same scaling. """ _, ax = plt.subplots(figsize=(14, 6)) # all points are plotted in white ax.scatter( np.array(data)[:, 0], np.array(data)[:, 1], s=300, color="white", edgecolor="black", ) # plot the points with colors according to the cluster they belong to for i, el in enumerate(list(cl.values())): ax.scatter( np.array(data)[el, 0], np.array(data)[el, 1], s=300, color=COLOR_DICT[i % len(COLOR_DICT)], edgecolor="black", ) # plot centers of mass, marked with an X for i, el in enumerate(list(cl.keys())): ax.scatter( np.array(data)[el, 0], np.array(data)[el, 1], s=500, color="red", marker="X", edgecolor="black", ) # add indexes to points in plot annotate_points(annotations=range(len(data)), points=np.array(data), ax=ax) if equal_axis_scale is True: ax.set_aspect("equal", adjustable="box") plt.show()