Coverage for src\cognitivefactory\interactive_clustering\clustering\kmeans.py: 100.00%
87 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-17 13:31 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-17 13:31 +0100
1# -*- coding: utf-8 -*-
3"""
4* Name: cognitivefactory.interactive_clustering.clustering.kmeans
5* Description: Implementation of constrained kmeans clustering algorithms.
6* Author: Erwan SCHILD
7* Created: 17/03/2021
8* Licence: CeCILL-C License v1.0 (https://cecill.info/licences.fr.html)
9"""
11# ==============================================================================
12# IMPORT PYTHON DEPENDENCIES
13# ==============================================================================
15import random
16from typing import Dict, List, Optional
18from scipy.sparse import csr_matrix, vstack
19from sklearn.metrics import pairwise_distances
21from cognitivefactory.interactive_clustering.clustering.abstract import (
22 AbstractConstrainedClustering,
23 rename_clusters_by_order,
24)
25from cognitivefactory.interactive_clustering.constraints.abstract import AbstractConstraintsManager
28# ==============================================================================
29# KMEANS CONSTRAINED CLUSTERING
30# ==============================================================================
31class KMeansConstrainedClustering(AbstractConstrainedClustering):
32 """
33 This class implements the KMeans constrained clustering.
34 It inherits from `AbstractConstrainedClustering`.
36 References:
37 - KMeans Clustering: `MacQueen, J. (1967). Some methods for classification and analysis of multivariate observations. Proceedings of the fifth Berkeley symposium on mathematical statistics and probability 1(14), 281–297.`
38 - Constrained _'COP'_ KMeans Clustering: `Wagstaff, K., C. Cardie, S. Rogers, et S. Schroedl (2001). Constrained K-means Clustering with Background Knowledge. International Conference on Machine Learning`
40 Example:
41 ```python
42 # Import.
43 from scipy.sparse import csr_matrix
44 from cognitivefactory.interactive_clustering.constraints.binary import BinaryConstraintsManager
45 from cognitivefactory.interactive_clustering.clustering.kmeans import KMeansConstrainedClustering
47 # Create an instance of kmeans clustering.
48 clustering_model = KMeansConstrainedClustering(
49 model="COP",
50 random_seed=2,
51 )
53 # Define vectors.
54 # NB : use cognitivefactory.interactive_clustering.utils to preprocess and vectorize texts.
55 vectors = {
56 "0": csr_matrix([1.00, 0.00, 0.00, 0.00]),
57 "1": csr_matrix([0.95, 0.02, 0.02, 0.01]),
58 "2": csr_matrix([0.98, 0.00, 0.02, 0.00]),
59 "3": csr_matrix([0.99, 0.00, 0.01, 0.00]),
60 "4": csr_matrix([0.50, 0.22, 0.21, 0.07]),
61 "5": csr_matrix([0.50, 0.21, 0.22, 0.07]),
62 "6": csr_matrix([0.01, 0.01, 0.01, 0.97]),
63 "7": csr_matrix([0.00, 0.01, 0.00, 0.99]),
64 "8": csr_matrix([0.00, 0.00, 0.00, 1.00]),
65 }
67 # Define constraints manager.
68 constraints_manager = BinaryConstraintsManager(list_of_data_IDs=list(vectors.keys()))
69 constraints_manager.add_constraint(data_ID1="0", data_ID2="1", constraint_type="MUST_LINK")
70 constraints_manager.add_constraint(data_ID1="0", data_ID2="7", constraint_type="MUST_LINK")
71 constraints_manager.add_constraint(data_ID1="0", data_ID2="8", constraint_type="MUST_LINK")
72 constraints_manager.add_constraint(data_ID1="4", data_ID2="5", constraint_type="MUST_LINK")
73 constraints_manager.add_constraint(data_ID1="0", data_ID2="4", constraint_type="CANNOT_LINK")
74 constraints_manager.add_constraint(data_ID1="2", data_ID2="4", constraint_type="CANNOT_LINK")
76 # Run clustering.
77 dict_of_predicted_clusters = clustering_model.cluster(
78 constraints_manager=constraints_manager,
79 vectors=vectors,
80 nb_clusters=3,
81 )
83 # Print results.
84 print("Expected results", ";", {"0": 0, "1": 0, "2": 1, "3": 1, "4": 2, "5": 2, "6": 0, "7": 0, "8": 0,})
85 print("Computed results", ":", dict_of_predicted_clusters)
86 ```
87 """
89 # ==============================================================================
90 # INITIALIZATION
91 # ==============================================================================
92 def __init__(
93 self,
94 model: str = "COP",
95 max_iteration: int = 150,
96 tolerance: float = 1e-4,
97 random_seed: Optional[int] = None,
98 **kargs,
99 ) -> None:
100 """
101 The constructor for KMeans Constrainted Clustering class.
103 Args:
104 model (str, optional): The kmeans clustering model to use. Available kmeans models are `"COP"`. Defaults to `"COP"`.
105 max_iteration (int, optional): The maximum number of kmeans iteration for convergence. Defaults to `150`.
106 tolerance (float, optional): The tolerance for convergence computation. Defaults to `1e-4`.
107 random_seed (Optional[int]): The random seed to use to redo the same clustering. Defaults to `None`.
108 **kargs (dict): Other parameters that can be used in the instantiation.
110 Raises:
111 ValueError: if some parameters are incorrectly set.
112 """
114 # Store `self.`model`.
115 if model != "COP": # TODO use `not in {"COP"}`.
116 raise ValueError("The `model` '" + str(model) + "' is not implemented.")
117 self.model: str = model
119 # Store 'self.max_iteration`.
120 if max_iteration < 1:
121 raise ValueError("The `max_iteration` must be greater than or equal to 1.")
122 self.max_iteration: int = max_iteration
124 # Store `self.tolerance`.
125 if tolerance < 0:
126 raise ValueError("The `tolerance` must be greater than 0.0.")
127 self.tolerance: float = tolerance
129 # Store `self.random_seed`.
130 self.random_seed: Optional[int] = random_seed
132 # Store `self.kargs` for kmeans clustering.
133 self.kargs = kargs
135 # Initialize `self.dict_of_predicted_clusters`.
136 self.dict_of_predicted_clusters: Optional[Dict[str, int]] = None
138 # ==============================================================================
139 # MAIN - CLUSTER DATA
140 # ==============================================================================
141 def cluster(
142 self,
143 constraints_manager: AbstractConstraintsManager,
144 vectors: Dict[str, csr_matrix],
145 nb_clusters: Optional[int],
146 verbose: bool = False,
147 **kargs,
148 ) -> Dict[str, int]:
149 """
150 The main method used to cluster data with the KMeans model.
152 Args:
153 constraints_manager (AbstractConstraintsManager): A constraints manager over data IDs that will force clustering to respect some conditions during computation.
154 vectors (Dict[str, csr_matrix]): The representation of data vectors. The keys of the dictionary represents the data IDs. This keys have to refer to the list of data IDs managed by the `constraints_manager`. The value of the dictionary represent the vector of each data.
155 nb_clusters (Optional[int]): The number of clusters to compute. #TODO Set defaults to None with elbow method or other method ?
156 verbose (bool, optional): Enable verbose output. Defaults to `False`.
157 **kargs (dict): Other parameters that can be used in the clustering.
159 Raises:
160 ValueError: if `vectors` and `constraints_manager` are incompatible, or if some parameters are incorrectly set.
162 Returns:
163 Dict[str,int]: A dictionary that contains the predicted cluster for each data ID.
164 """
166 ###
167 ### GET PARAMETERS
168 ###
170 # Store `self.constraints_manager` and `self.list_of_data_IDs`.
171 if not isinstance(constraints_manager, AbstractConstraintsManager):
172 raise ValueError("The `constraints_manager` parameter has to be a `AbstractConstraintsManager` type.")
173 self.constraints_manager: AbstractConstraintsManager = constraints_manager
174 self.list_of_data_IDs: List[str] = self.constraints_manager.get_list_of_managed_data_IDs()
176 # Store `self.vectors`.
177 if not isinstance(vectors, dict):
178 raise ValueError("The `vectors` parameter has to be a `dict` type.")
179 self.vectors: Dict[str, csr_matrix] = vectors
181 # Store `self.nb_clusters`.
182 if (nb_clusters is None) or (nb_clusters < 2):
183 raise ValueError("The `nb_clusters` '" + str(nb_clusters) + "' must be greater than or equal to 2.")
184 self.nb_clusters: int = min(nb_clusters, len(self.list_of_data_IDs))
186 ###
187 ### RUN KMEANS CONSTRAINED CLUSTERING
188 ###
190 # Initialize `self.dict_of_predicted_clusters`.
191 self.dict_of_predicted_clusters = None
193 # Case of `"COP"` KMeans clustering.
194 ##### DEFAULTS : if self.model=="COP":
195 self.dict_of_predicted_clusters = self._clustering_kmeans_model_COP(verbose=verbose)
197 ###
198 ### RETURN PREDICTED CLUSTERS
199 ###
201 return self.dict_of_predicted_clusters
203 # ==============================================================================
204 # IMPLEMENTATION - COP KMEANS CLUSTERING
205 # ==============================================================================
206 def _clustering_kmeans_model_COP(self, verbose: bool = False) -> Dict[str, int]:
207 """
208 Implementation of COP-kmeans algorithm, based on constraints violation check during cluster affectation.
210 References:
211 - Constrained _'COP'_ KMeans Clustering: `Wagstaff, K., C. Cardie, S. Rogers, et S. Schroedl (2001). Constrained K-means Clustering with Background Knowledge. International Conference on Machine Learning`
213 Args:
214 verbose (bool, optional): Enable verbose output. Defaults is `False`.
216 Returns:
217 Dict[str,int]: A dictionary that contains the predicted cluster for each data ID.
218 """
220 ###
221 ### INITIALIZATION OF CENTROIDS
222 ###
224 # Initialize `self.centroids`.
225 self.centroids: Dict[int, csr_matrix] = self.initialize_centroids()
227 # Initialize clusters
228 self.clusters: Dict[str, int] = {data_ID: -1 for data_ID in self.list_of_data_IDs}
230 ###
231 ### RUN KMEANS UNTIL CONVERGENCE
232 ###
234 # Define variable of convergence checks.
235 converged: bool = False
236 shift: float = float("Inf")
237 current_iteration: int = 0
239 # While no convergence
240 while (not converged) and (current_iteration < self.max_iteration):
241 # Increase current iteration number.
242 current_iteration += 1
244 ###
245 ### ASSIGN DATA TO THE CLOSEST CLUSTER WITH CONSTRAINTS RESPECT
246 ###
248 # Initialize temporary results.
249 new_clusters: Dict[str, int] = {data_ID: -1 for data_ID in self.list_of_data_IDs}
251 # For each data ID.
252 list_of_data_IDs_to_assign: List[str] = self.list_of_data_IDs.copy()
254 # Precompute pairwise distances between data IDs and clusters centroids.
255 matrix_of_pairwise_distances_between_data_and_clusters: csr_matrix = pairwise_distances(
256 X=vstack( # Vectors of data IDs.
257 self.vectors[data_ID] for data_ID in self.constraints_manager.get_list_of_managed_data_IDs()
258 ),
259 Y=vstack( # Vectors of cluster centroids.
260 centroid_vector for centroid_vector in self.centroids.values()
261 ),
262 metric="euclidean", # TODO get different pairwise_distances config in **kargs
263 )
265 # Format pairwise distances in a dictionary.
266 dict_of_pairwise_distances_between_data_and_clusters: Dict[str, Dict[int, float]] = {
267 data_ID: {
268 cluster_ID: float(matrix_of_pairwise_distances_between_data_and_clusters[i_data, i_cluster])
269 for i_cluster, cluster_ID in enumerate(self.centroids.keys())
270 }
271 for i_data, data_ID in enumerate(self.constraints_manager.get_list_of_managed_data_IDs())
272 }
274 # While all data aren't assigned.
275 while list_of_data_IDs_to_assign:
276 # Get a data_ID to assign
277 data_ID_to_assign: str = list_of_data_IDs_to_assign.pop()
279 # Get the list of not compatible cluster IDs for assignation
280 not_compatible_cluster_IDs: List[int] = [
281 new_clusters[data_ID]
282 for data_ID in self.list_of_data_IDs
283 if (new_clusters[data_ID] != -1)
284 and (
285 self.constraints_manager.get_inferred_constraint(
286 data_ID1=data_ID_to_assign,
287 data_ID2=data_ID,
288 )
289 == "CANNOT_LINK"
290 )
291 ]
293 # Get the list of possible cluster IDs for assignation.
294 possible_cluster_IDs: List[int] = [
295 cluster_ID for cluster_ID in self.centroids.keys() if cluster_ID not in not_compatible_cluster_IDs
296 ]
298 # If there is no possible cluster...
299 if possible_cluster_IDs == []: # noqa: WPS520
300 # Assign the data ID to a new cluster
301 new_clusters[data_ID_to_assign] = max(
302 max([cluster_ID for _, cluster_ID in new_clusters.items()]) + 1, self.nb_clusters
303 )
305 # If there is possible clusters...
306 else:
307 # Get distance between data ID and all possible centroids.
308 distances_to_cluster_ID: Dict[float, int] = {
309 dict_of_pairwise_distances_between_data_and_clusters[data_ID_to_assign][cluster_ID]: cluster_ID
310 for cluster_ID in possible_cluster_IDs
311 }
313 # Get the clostest cluster to data ID by distance.
314 min_distance: float = min(distances_to_cluster_ID)
315 new_clusters[data_ID_to_assign] = distances_to_cluster_ID[min_distance]
317 # Assign all data ID that are linked by a `"MUST_LINK"` constraint to the new cluster.
318 for data_ID in self.list_of_data_IDs:
319 if (
320 self.constraints_manager.get_inferred_constraint(
321 data_ID1=data_ID_to_assign,
322 data_ID2=data_ID,
323 )
324 == "MUST_LINK"
325 ):
326 if data_ID in list_of_data_IDs_to_assign:
327 list_of_data_IDs_to_assign.remove(data_ID)
328 new_clusters[data_ID] = new_clusters[data_ID_to_assign]
330 ###
331 ### COMPUTE NEW CENTROIDS
332 ###
334 # Compute new centroids
335 new_centroids: Dict[int, csr_matrix] = self.compute_centroids(clusters=new_clusters)
337 ###
338 ### CHECK CONVERGENCE
339 ###
341 # Check by centroids difference (with tolerance).
342 if set(self.clusters.values()) == set(new_clusters.values()):
343 # Precompute distance between old and new cluster centroids.
344 matrix_of_pairwise_distances_between_old_and_new_clusters: csr_matrix = pairwise_distances(
345 X=vstack( # Old clusters centroids.
346 self.centroids[cluster_ID] for cluster_ID in self.centroids.keys()
347 ),
348 Y=vstack( # New clusters centroids.
349 new_centroids[cluster_ID] for cluster_ID in self.centroids.keys()
350 ),
351 metric="euclidean", # TODO get different pairwise_distances config in **kargs
352 )
354 # Compute shift between kmeans iterations.
355 shift = sum(
356 matrix_of_pairwise_distances_between_old_and_new_clusters[i][i]
357 for i in range(matrix_of_pairwise_distances_between_old_and_new_clusters.shape[0])
358 )
360 # If shift is under tolerance : convergence !
361 converged = shift < self.tolerance
363 # Check if number of clusters have changed.
364 else:
365 # Uncomparable shift.
366 shift = float("Inf")
368 # Don't converge.
369 converged = False
371 ###
372 ### APPLY NEW COMPUTATIONS
373 ###
375 # Affect new clusters.
376 self.clusters = new_clusters
378 # Affect new centroids.
379 self.centroids = new_centroids
381 # Verbose.
382 if verbose and (current_iteration % 5 == 0): # pragma: no cover
383 # Verbose - Print progression status.
384 print(" CLUSTERING_ITERATION=" + str(current_iteration), ",", "shift=" + str(shift))
386 # Verbose.
387 if verbose: # pragma: no cover
388 # Verbose - Print progression status.
389 print(" CLUSTERING_ITERATION=" + str(current_iteration), ",", "converged=" + str(converged))
391 # Rename cluster IDs by order.
392 self.clusters = rename_clusters_by_order(clusters=self.clusters)
393 self.centroids = self.compute_centroids(clusters=new_clusters)
395 return self.clusters
397 # ==============================================================================
398 # INITIALIZATION OF CLUSTERS
399 # ==============================================================================
400 def initialize_centroids(
401 self,
402 ) -> Dict[int, csr_matrix]:
403 """
404 Initialize the centroid of each cluster by a vector.
405 The choice is based on a random selection among data to cluster.
407 Returns:
408 Dict[int, csr_matrix]: A dictionary which represent each cluster by a centroid.
409 """
411 # Get the list of possible indices.
412 indices: List[str] = self.list_of_data_IDs.copy()
414 # Shuffle the list of possible indices.
415 random.seed(self.random_seed)
416 random.shuffle(indices)
418 # Subset indices.
419 indices = indices[: self.nb_clusters]
421 # Set initial centroids based on vectors.
422 centroids: Dict[int, csr_matrix] = {
423 cluster_ID: self.vectors[indices[cluster_ID]] for cluster_ID in range(self.nb_clusters)
424 }
426 # Return centroids.
427 return centroids
429 # ==============================================================================
430 # COMPUTE NEW CENTROIDS
431 # ==============================================================================
432 def compute_centroids(
433 self,
434 clusters: Dict[str, int],
435 ) -> Dict[int, csr_matrix]:
436 """
437 Compute the centroids of each cluster.
439 Args:
440 clusters (Dict[str,int]): Current clusters assignation.
442 Returns:
443 Dict[int, csr_matrix]: A dictionary which represent each cluster by a centroid.
444 """
446 # Initialize centroids.
447 centroids: Dict[int, csr_matrix] = {}
449 # For all possible cluster ID.
450 for cluster_ID in set(clusters.values()):
451 # Compute cluster members.
452 members_of_cluster_ID: List[csr_matrix] = [
453 self.vectors[data_ID]
454 for data_ID in self.constraints_manager.get_list_of_managed_data_IDs()
455 if clusters[data_ID] == cluster_ID
456 ]
458 # Compute centroid.
459 centroid_for_cluster_ID: csr_matrix = sum(members_of_cluster_ID) / len(members_of_cluster_ID)
461 # Add centroids.
462 centroids[cluster_ID] = centroid_for_cluster_ID
464 # Return centroids.
465 return centroids