Source code for clustviz.birch

import numpy as np
from pyclustering.cluster.agglomerative import agglomerative, type_link
from pyclustering.cluster.encoder import cluster_encoder, type_encoding
from pyclustering.cluster.birch import birch as birch_pyclustering
from pyclustering.cluster import cluster_visualizer

from pyclustering.container.cftree import leaf_node, non_leaf_node, cfnode_type, measurement_type
from pyclustering.container.cftree import cftree as cftree_pyclustering

from clustviz.utils import COLOR_DICT, annotate_points


[docs]class cftree(cftree_pyclustering):
[docs] def insert(self, entry): """ Insert clustering feature to the tree. :param entry: clustering feature that should be inserted. """ print("insert entry") if self.__root is None: print("first time") node = leaf_node(entry, None, [entry]) self.__root = node self.__leafes.append(node) # Update statistics self.__amount_entries += 1 self.__amount_nodes += 1 self.__height += 1 # root has successor now else: print("recursive insert") child_node_updation = self.__recursive_insert(entry, self.__root) if child_node_updation is True: print("try merge_nearest_successors") # Splitting has been finished, check for possibility to merge (at least we have already two children). if self.__merge_nearest_successors(self.__root) is True: self.__amount_nodes -= 1
def __recursive_insert(self, entry, search_node) -> bool: """ Recursive insert of the entry to the tree. It performs all required procedures during insertion such as splitting, merging. :param entry: clustering feature. :param search_node: node from that insertion should be started. :return: True if number of nodes at the below level is changed, otherwise False. """ # Non-leaf node if search_node.type == cfnode_type.CFNODE_NONLEAF: print("insert for non-leaf") return self.__insert_for_noneleaf_node(entry, search_node) # Leaf is reached else: print("insert for leaf") return self.__insert_for_leaf_node(entry, search_node) def __insert_for_leaf_node(self, entry, search_node): """ Recursive insert entry from leaf node to the tree. :param entry: Clustering feature. :param: search_node (cfnode): None-leaf node from that insertion should be started. @return (bool) True if number of nodes at the below level is changed, otherwise False. """ node_amount_updation = False # Try to absorb by the entity index_nearest_entry = search_node.get_nearest_index_entry( entry, self.__type_measurement ) nearest_entry = search_node.entries[index_nearest_entry] merged_entry = nearest_entry + entry print("index_nearest_entry", index_nearest_entry) print("nearest entry", nearest_entry) print("diam:", merged_entry.get_diameter()) # Otherwise try to add new entry if merged_entry.get_diameter() > self.__threshold: print("diam greater than threshold") # If it's not exceeded append entity and update feature of the leaf node. search_node.insert_entry(entry) # Otherwise current node should be splitted if len(search_node.entries) > self.__max_entries: print("node has to split") self.__split_procedure(search_node) node_amount_updation = True # Update statistics self.__amount_entries += 1 else: print("diam ok") search_node.entries[index_nearest_entry] = merged_entry search_node.feature += entry return node_amount_updation def __insert_for_noneleaf_node(self, entry, search_node): """ @brief Recursive insert entry from none-leaf node to the tree. @param[in] entry (cfentry): Clustering feature. @param[in] search_node (cfnode): None-leaf node from that insertion should be started. @return (bool) True if number of nodes at the below level is changed, otherwise False. """ node_amount_updation = False min_key = lambda child_node: child_node.get_distance( search_node, self.__type_measurement ) nearest_child_node = min(search_node.successors, key=min_key) print("nearestchildnode: ", nearest_child_node) print("recursive insert in !!!insert_for_nonleaf!!!") child_node_updation = self.__recursive_insert( entry, nearest_child_node ) # Update clustering feature of none-leaf node. search_node.feature += entry # Check branch factor, probably some leaf has been splitted and threshold has been exceeded. if len(search_node.successors) > self.__branch_factor: print("over branch_factor ") # Check if it's aleady root then new root should be created (height is increased in this case). if search_node is self.__root: print("height increases") self.__root = non_leaf_node( search_node.feature, None, [search_node] ) search_node.parent = self.__root # Update statistics self.__amount_nodes += 1 self.__height += 1 print("split non-leaf node") [new_node1, new_node2] = self.__split_nonleaf_node(search_node) # Update parent list of successors parent = search_node.parent parent.successors.remove(search_node) parent.successors.append(new_node1) parent.successors.append(new_node2) # Update statistics self.__amount_nodes += 1 node_amount_updation = True elif child_node_updation is True: # Splitting has been finished, check for possibility to merge (at least we have already two children). if self.__merge_nearest_successors(search_node) is True: self.__amount_nodes -= 1 return node_amount_updation def __merge_nearest_successors(self, node): """! @brief Find nearest sucessors and merge them. @param[in] node (non_leaf_node): Node whose two nearest successors should be merged. @return (bool): True if merging has been successfully performed, otherwise False. """ merging_result = False if node.successors[0].type == cfnode_type.CFNODE_NONLEAF: [ nearest_child_node1, nearest_child_node2, ] = node.get_nearest_successors(self.__type_measurement) if ( len(nearest_child_node1.successors) + len(nearest_child_node2.successors) <= self.__branch_factor ): node.successors.remove(nearest_child_node2) if nearest_child_node2.type == cfnode_type.CFNODE_LEAF: self.__leafes.remove(nearest_child_node2) nearest_child_node1.merge(nearest_child_node2) merging_result = True if merging_result is True: print("merging successful") else: print("merging not successful") return merging_result def __split_leaf_node(self, node): """! @brief Performs splitting of the specified leaf node. @param[in] node (leaf_node): Leaf node that should be splitted. @return (list) New pair of leaf nodes [leaf_node1, leaf_node2]. @warning Splitted node is transformed to non_leaf. """ print("split leaf") # search farthest pair of entries [farthest_entity1, farthest_entity2] = node.get_farthest_entries( self.__type_measurement ) print("farthest1 ", farthest_entity1) print("farthest2 ", farthest_entity2) # create new nodes new_node1 = leaf_node( farthest_entity1, node.parent, [farthest_entity1] ) new_node2 = leaf_node( farthest_entity2, node.parent, [farthest_entity2] ) # re-insert other entries for entity in node.entries: if (entity is not farthest_entity1) and ( entity is not farthest_entity2 ): distance1 = new_node1.feature.get_distance( entity, self.__type_measurement ) distance2 = new_node2.feature.get_distance( entity, self.__type_measurement ) if distance1 < distance2: new_node1.insert_entry(entity) else: new_node2.insert_entry(entity) print("new_node1 ", new_node1) print("new_node2 ", new_node2) return [new_node1, new_node2]
[docs] def show_feature_distribution(self, data=None): """! @brief Shows feature distribution. @details Only features in 1D, 2D, 3D space can be visualized. @param[in] data (list): List of points that will be used for visualization, if it not specified than feature will be displayed only. """ visualizer = cluster_visualizer() print("amount of nodes: ", self.__amount_nodes) if data is not None: visualizer.append_cluster(data, marker="x") for level in range(0, self.height): level_nodes = self.get_level_nodes(level) centers = [node.feature.get_centroid() for node in level_nodes] visualizer.append_cluster( centers, None, markersize=(self.height - level + 1) * 5 ) visualizer.show()
[docs]class birch(birch_pyclustering): def __init__(self, data, number_clusters, branching_factor=50, max_node_entries=200, diameter=0.5, type_measurement=measurement_type.CENTROID_EUCLIDEAN_DISTANCE, entry_size_limit=500, diameter_multiplier=1.5, ccore=True): super().__init__(data, number_clusters, branching_factor, max_node_entries, diameter, type_measurement, entry_size_limit, diameter_multiplier, ccore) # otherwise it would refer to pyclustering original cftree self.__tree = cftree(branching_factor, max_node_entries, diameter, type_measurement)
[docs] def return_tree(self): """Return the tree built by the algorithm.""" return self.__tree
[docs] def process(self, plotting=False): """! @brief Performs cluster analysis in line with rules of BIRCH algorithm. @return (birch) Returns itself (BIRCH instance). @see get_clusters() """ self.__insert_data(plotting=plotting) print("extracting features") self.__extract_features() print("features: ", self.__features) cf_data = [feature.get_centroid() for feature in self.__features] algorithm = agglomerative(cf_data, self.__number_clusters, type_link.SINGLE_LINK).process() self.__cf_clusters = algorithm.get_clusters() cf_labels = cluster_encoder(type_encoding.CLUSTER_INDEX_LIST_SEPARATION, self.__cf_clusters, cf_data). \ set_encoding(type_encoding.CLUSTER_INDEX_LABELING).get_clusters() self.__clusters = [[] for _ in range(len(self.__cf_clusters))] for index_point in range(len(self.__pointer_data)): index_cf_entry = np.argmin(np.sum(np.square( np.subtract(cf_data, self.__pointer_data[index_point])), axis=1)) index_cluster = cf_labels[index_cf_entry] self.__clusters[index_cluster].append(index_point) return self
def __insert_data(self, plotting=False): """! @brief Inserts input data to the tree. @remark If number of maximum number of entries is exceeded than diameter is increased and tree is rebuilt. """ for index_point in range(0, len(self.__pointer_data)): if (index_point != 0) and (plotting is True): plot_tree_fin(self.__tree) plot_birch_leaves(self.__tree, data=self.__pointer_data) print("\n\n") print("index: {}".format(index_point)) point = self.__pointer_data[index_point] print("point {}".format(point)) self.__tree.insert_point(point) if self.__tree.amount_entries > self.__entry_size_limit: print("rebuilding tree") self.__tree = self.__rebuild_tree(index_point) def __rebuild_tree(self, index_point): """! @brief Rebuilt tree in case of maxumum number of entries is exceeded. @param[in] index_point (uint): Index of point that is used as end point of re-building. @return (cftree) Rebuilt tree with encoded points till specified point from input data space. """ rebuild_result = False increased_diameter = self.__tree.threshold * self.__diameter_multiplier tree = None while rebuild_result is False: # increase diameter and rebuild tree if increased_diameter == 0.0: increased_diameter = 1.0 # build tree with update parameters tree = cftree(self.__tree.branch_factor, self.__tree.max_entries, increased_diameter, self.__tree.type_measurement) for index_point in range(0, index_point + 1): point = self.__pointer_data[index_point] tree.insert_point(point) if tree.amount_entries > self.__entry_size_limit: increased_diameter *= self.__diameter_multiplier continue # Re-build is successful. rebuild_result = True return tree
[docs]def plot_tree_fin(tree, info=False): """ Plot the final CFtree built by BIRCH. Leaves are colored, and every node displays the total number of elements in its child nodes. :param tree: tree built during BIRCH algorithm execution. :param info: if True, tree height, number of nodes, leaves and entries are printed. """ import graphviz height = tree.height if info is True: print("Tree height is {0}".format(height)) print("Number of nodes: {0}".format(tree.amount_nodes)) print("Number of leaves: {0}".format(len(tree.leafes))) print("Number of entries: {0}".format(tree.amount_entries)) node_limit = 2704 if tree.amount_nodes > node_limit: print("Too many nodes, limit is {0}".format(node_limit)) return def feat_create(level_nodes): """ Auxiliary function that returns for each node level the features, the number of points and the successors. """ features = [] features_num = [] succ_num = [] for el in level_nodes: f = el.feature features.append(f) features_num.append(f.number_points) try: succ_num.append(len(el.successors)) except AttributeError: pass return features, features_num, succ_num # collecting data for each tree level except bottom feat = [] feat_num = [] succ_num = [] for lev in range(height): (f1, f2, s1) = feat_create(tree.get_level_nodes(lev)) feat.append(f1) feat_num.append(f2) succ_num.append(s1) # collect data of leaves single_entries = [] for z in tree.get_level_nodes(height - 1): sing_ent_prov = [] for single_entry in z.entries: sing_ent_prov.append(single_entry.number_points) single_entries.append(sing_ent_prov) # creating names for nodes prov = ( "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c " "d e f g h i j k l m n o p q r s t u v w x y z".split(" ") ) lett = [] for l1 in prov: for l2 in prov: lett.append(l1 + l2) # creating the tree dot = graphviz.Digraph(comment="Clustering") # root dot.node(lett[0], str(feat_num[0][0])) # all other levels placeholder = 0 for level in range(1, height + 1): # all levels between root and leaves if level != height: for q in range(1, len(feat_num[level]) + 1): dot.node(lett[placeholder + q], str(feat_num[level][q - 1])) placeholder += q # leaves with colors else: for q in range(1, len(single_entries) + 1): dot.node( lett[placeholder + q], str(single_entries[q - 1]), color=COLOR_DICT[(q - 1) % len(COLOR_DICT)], style="filled", ) # adding edges between nodes a = 0 b = 0 # for all nodes except leaves for level in range(0, height): for num_succs in succ_num[level]: for el in range(num_succs): dot.edge(lett[a], lett[b + el + 1]) a += 1 b += el + 1 # for leaves for i in range(len(single_entries)): dot.edge(lett[a], lett[b + i + 1]) a += 1 graph = graphviz.Source(dot) # .view() # show tree display(graph)
[docs]def plot_birch_leaves(tree, data): """ Scatter plot of data point, with colors according to the leaf the belong to. Points in the same entry in a leaf are represented by a cross, with the number of points over it. :param tree: tree built during BIRCH algorithm execution. :param data: input data as array of list of list """ import matplotlib.pyplot as plt _, ax = plt.subplots(figsize=(14, 6)) # plot every point in white plt.scatter( np.array(data)[:, 0], np.array(data)[:, 1], s=300, color="white", edgecolor="black", ) # for every leaf for i, el in enumerate(tree.get_level_nodes(tree.height - 1)): # for every entry in the leaf for entry in el.entries: # if it is a single point, plot it with its color if entry.number_points == 1: plt.scatter( entry.linear_sum[0], entry.linear_sum[1], color=COLOR_DICT[i % len(COLOR_DICT)], s=300, edgecolor="black", ) # else, plot the entry centroid as a cross and leave the points white else: plt.scatter( entry.get_centroid()[0], entry.get_centroid()[1], color=COLOR_DICT[i % len(COLOR_DICT)], marker="X", s=200, ) plt.annotate( entry.number_points, (entry.get_centroid()[0], entry.get_centroid()[1]), fontsize=18, ) # plot indexes of points in plot annotate_points(annotations=range(len(data)), points=np.array(data), ax=ax) plt.show()