Source code for clustviz.clarans

import random
from typing import Tuple, Dict, Any

import scipy
import itertools
import graphviz
import numpy as np
import pandas as pd
from clustviz.pam import plot_pam

from pyclustering.utils import euclidean_distance_square
from pyclustering.cluster.clarans import clarans as clarans_pyclustering


[docs]class clarans(clarans_pyclustering):
[docs] def process(self, plotting: bool = False): """! @brief Performs cluster analysis in line with rules of CLARANS algorithm. @return (clarans) Returns itself (CLARANS instance). @see get_clusters() @see get_medoids() """ random.seed() # loop for a numlocal number of times for _ in range(0, self.__numlocal): print("numlocal: ", _) # set (current) random medoids self.__current = random.sample( range(0, len(self.__pointer_data)), self.__number_clusters ) # update clusters in line with random allocated medoids self.__update_clusters(self.__current) # optimize configuration self.__optimize_configuration() # obtain cost of current cluster configuration and compare it with the best obtained estimation = self.__calculate_estimation() if estimation < self.__optimal_estimation: print( "Better configuration found with medoids: {0} and cost: {1}".format( self.__current[:], estimation ) ) self.__optimal_medoids = self.__current[:] self.__optimal_estimation = estimation if plotting is True: self.__update_clusters(self.__optimal_medoids) plot_pam( self.__pointer_data, dict(zip(self.__optimal_medoids, self.__clusters)), ) else: print( "Configuration found does not improve current best one because its cost is {0}".format( estimation ) ) if plotting is True: self.__update_clusters(self.__current[:]) plot_pam( self.__pointer_data, dict(zip(self.__current[:], self.__clusters)), ) self.__update_clusters(self.__optimal_medoids) if plotting is True: print("FINAL RESULT:") plot_pam( self.__pointer_data, dict(zip(self.__optimal_medoids, self.__clusters)), ) return self
def __optimize_configuration(self): """! @brief Finds quasi-optimal medoids and updates in line with them clusters in line with algorithm's rules. """ index_neighbor = 0 counter = 0 while index_neighbor < self.__maxneighbor: # get random current medoid that is to be replaced current_medoid_index = self.__current[ random.randint(0, self.__number_clusters - 1) ] current_medoid_cluster_index = self.__belong[current_medoid_index] # get new candidate to be medoid candidate_medoid_index = random.randint( 0, len(self.__pointer_data) - 1 ) while candidate_medoid_index in self.__current: candidate_medoid_index = random.randint( 0, len(self.__pointer_data) - 1 ) candidate_cost = 0.0 for point_index in range(0, len(self.__pointer_data)): if point_index not in self.__current: # get non-medoid point and its medoid point_cluster_index = self.__belong[point_index] point_medoid_index = self.__current[point_cluster_index] # get other medoid that is nearest to the point (except current and candidate) other_medoid_index = self.__find_another_nearest_medoid( point_index, current_medoid_index ) other_medoid_cluster_index = self.__belong[ other_medoid_index ] # for optimization calculate all required distances # from the point to current medoid distance_current = euclidean_distance_square( self.__pointer_data[point_index], self.__pointer_data[current_medoid_index], ) # from the point to candidate median distance_candidate = euclidean_distance_square( self.__pointer_data[point_index], self.__pointer_data[candidate_medoid_index], ) # from the point to nearest (own) medoid distance_nearest = float("inf") if (point_medoid_index != candidate_medoid_index) and ( point_medoid_index != current_medoid_cluster_index ): distance_nearest = euclidean_distance_square( self.__pointer_data[point_index], self.__pointer_data[point_medoid_index], ) # apply rules for cost calculation if point_cluster_index == current_medoid_cluster_index: # case 1: if distance_candidate >= distance_nearest: candidate_cost += ( distance_nearest - distance_current ) # case 2: else: candidate_cost += ( distance_candidate - distance_current ) elif point_cluster_index == other_medoid_cluster_index: # case 3 ('nearest medoid' is the representative object of that cluster and object is more # similar to 'nearest' than to 'candidate'): if distance_candidate > distance_nearest: pass # case 4: else: candidate_cost += ( distance_candidate - distance_nearest ) if candidate_cost < 0: counter += 1 # set candidate that has won self.__current[ current_medoid_cluster_index ] = candidate_medoid_index # recalculate clusters self.__update_clusters(self.__current) # reset iterations and starts investigation from the begining index_neighbor = 0 else: index_neighbor += 1 print("Medoid set changed {0} times".format(counter))
[docs]def compute_cost_clarans(data: pd.DataFrame, _cur_choice: list) -> Tuple[float, Dict[Any, list]]: """ A function to compute the configuration cost. (modified from that of CLARA) :param data: The input dataframe. :param _cur_choice: The current set of medoid choices. :return: The total configuration cost, the medoids. """ total_cost = 0.0 medoids = {} for idx in _cur_choice: medoids[idx] = [] for i in list(data.index): choice = -1 min_cost = np.inf for m in medoids: # fast_euclidean from CLARA tmp = np.linalg.norm(data.loc[m] - data.loc[i]) if tmp < min_cost: choice = m min_cost = tmp medoids[choice].append(i) total_cost += min_cost # print("total_cost: ", total_cost) return total_cost, medoids
[docs]def plot_tree_clarans(data: pd.DataFrame, k: int) -> None: """ plot G_{k,n} as in the paper of CLARANS; only to use with small input data. :param data: input DataFrame. :param k: number of points in each combination (possible set of medoids). """ n = len(data) num_points = int(scipy.special.binom(n, k)) num_neigh = k * (n - k) if (num_points > 50) or (num_neigh > 10): print( "Either graph nodes are more than 50 or neighbors are more than 10, the graph would be too big" ) return # all possibile combinations of k elements from input data name_nodes = list(itertools.combinations(list(data.index), k)) dot = graphviz.Digraph(comment="Clustering") # draw nodes, also adding the configuration cost for i in range(num_points): tot_cost, meds = compute_cost_clarans(data, list(name_nodes[i])) tc = round(tot_cost, 3) dot.node(str(name_nodes[i]), str(name_nodes[i]) + ": " + str(tc)) # only connect nodes if they have k-1 common elements for i in range(num_points): for j in range(num_points): if i != j: if ( len(set(list(name_nodes[i])) & set(list(name_nodes[j]))) == k - 1 ): dot.edge(str(name_nodes[i]), str(name_nodes[j])) graph = graphviz.Source(dot) # .view() display(graph)