Coverage for src\cognitivefactory\interactive_clustering\clustering\dbscan.py: 82.98%
156 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-17 13:31 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-11-17 13:31 +0100
1# -*- coding: utf-8 -*-
3"""
4* Name: cognitivefactory.interactive_clustering.clustering.dbscan
5* Description: Implementation of constrained DBScan clustering algorithms.
6* Author: Marc TRUTT, Esther LENOTRE, David NICOLAZO
7* Created: 08/05/2022
8* Licence: CeCILL (https://cecill.info/licences.fr.html)
9"""
11# ==============================================================================
12# IMPORT PYTHON DEPENDENCIES
13# ==============================================================================
15import warnings
17# import random
18from typing import Dict, List, Optional
20import numpy as np
21from scipy.sparse import csr_matrix, vstack
22from sklearn.metrics import pairwise_distances
24from cognitivefactory.interactive_clustering.clustering.abstract import (
25 AbstractConstrainedClustering,
26 rename_clusters_by_order,
27)
28from cognitivefactory.interactive_clustering.constraints.abstract import AbstractConstraintsManager
31# ==============================================================================
32# DBSCAN CONSTRAINED CLUSTERING
33# ==============================================================================
34class DBScanConstrainedClustering(AbstractConstrainedClustering):
35 """
36 This class implements the DBScan constrained clustering.
37 It inherits from `AbstractConstrainedClustering`.
39 References:
40 - DBScan Clustering: `Ester, Martin & Kröger, Peer & Sander, Joerg & Xu, Xiaowei. (1996). A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise. KDD. 96. 226-231`.
41 - Constrained DBScan Clustering: `Ruiz, Carlos & Spiliopoulou, Myra & Menasalvas, Ernestina. (2007). C-DBSCAN: Density-Based Clustering with Constraints. 216-223. 10.1007/978-3-540-72530-5_25.`
43 Example:
44 ```python
45 # Import.
46 from scipy.sparse import csr_matrix
47 from cognitivefactory.interactive_clustering.constraints.binary import BinaryConstraintsManager
48 from cognitivefactory.interactive_clustering.clustering.dbscan import DBScanConstrainedClustering
50 # Create an instance of CDBscan clustering.
51 clustering_model = DBScanConstrainedClustering(
52 eps=0.02,
53 min_samples=3,
54 )
56 # Define vectors.
57 # NB : use cognitivefactory.interactive_clustering.utils to preprocess and vectorize texts.
58 vectors = {
59 "0": csr_matrix([1.00, 0.00, 0.00, 0.00]),
60 "1": csr_matrix([0.95, 0.02, 0.02, 0.01]),
61 "2": csr_matrix([0.98, 0.00, 0.02, 0.00]),
62 "3": csr_matrix([0.99, 0.00, 0.01, 0.00]),
63 "4": csr_matrix([0.50, 0.22, 0.21, 0.07]),
64 "5": csr_matrix([0.50, 0.21, 0.22, 0.07]),
65 "6": csr_matrix([0.01, 0.01, 0.01, 0.97]),
66 "7": csr_matrix([0.00, 0.01, 0.00, 0.99]),
67 "8": csr_matrix([0.00, 0.00, 0.00, 1.00]),
68 }
70 # Define constraints manager.
71 constraints_manager = BinaryConstraintsManager(list_of_data_IDs=list(vectors.keys()))
72 constraints_manager.add_constraint(data_ID1="0", data_ID2="1", constraint_type="MUST_LINK")
73 constraints_manager.add_constraint(data_ID1="0", data_ID2="7", constraint_type="MUST_LINK")
74 constraints_manager.add_constraint(data_ID1="0", data_ID2="8", constraint_type="MUST_LINK")
75 constraints_manager.add_constraint(data_ID1="4", data_ID2="5", constraint_type="MUST_LINK")
76 constraints_manager.add_constraint(data_ID1="0", data_ID2="4", constraint_type="CANNOT_LINK")
77 constraints_manager.add_constraint(data_ID1="2", data_ID2="4", constraint_type="CANNOT_LINK")
79 # Run clustering.
80 dict_of_predicted_clusters = clustering_model.cluster(
81 constraints_manager=constraints_manager,
82 vectors=vectors,
83 #### nb_clusters=None,
84 )
86 # Print results.
87 print("Expected results", ";", {"0": 0, "1": 0, "2": 1, "3": 1, "4": 2, "5": 2, "6": 0, "7": 0, "8": 0,})
88 print("Computed results", ":", dict_of_predicted_clusters)
89 ```
91 Warns:
92 FutureWarning: `clustering.dbscan.DBScanConstrainedClustering` is still in development and is not fully tested : it is not ready for production use.
93 """
95 # ==============================================================================
96 # INITIALIZATION
97 # ==============================================================================
98 def __init__(
99 self,
100 eps: float = 0.5,
101 min_samples: int = 5,
102 random_seed: Optional[int] = None,
103 **kargs,
104 ) -> None:
105 """
106 The constructor for DBScan Constrainted Clustering class.
108 Args:
109 eps (float): The maximus radius of a neighborhood around its center. Defaults to `0.5`.
110 min_samples (int): The minimum number of points in a neighborhood to consider a center as a core point. Defaults to `5`.
111 random_seed (Optional[int]): The random seed to use to redo the same clustering. Defaults to `None`.
112 **kargs (dict): Other parameters that can be used in the instantiation.
114 Warns:
115 FutureWarning: `clustering.dbscan.DBScanConstrainedClustering` is still in development and is not fully tested : it is not ready for production use.
117 Raises:
118 ValueError: if some parameters are incorrectly set.
119 """
121 # Deprecation warnings
122 warnings.warn(
123 "`clustering.dbscan.DBScanConstrainedClustering` is still in development and is not fully tested : it is not ready for production use.",
124 FutureWarning, # DeprecationWarning
125 stacklevel=2,
126 )
128 # Store 'self.eps`.
129 if eps <= 0:
130 raise ValueError("The `eps` must be greater than 0.")
131 self.eps: float = eps
133 # Store 'self.min_samples`.
134 if min_samples <= 0:
135 raise ValueError("The `min_samples` must be greater than or equal to 1.")
136 self.min_samples: int = min_samples
138 # Store `self.random_seed`.
139 self.random_seed: Optional[int] = random_seed
141 # Store `self.kargs` for kmeans clustering.
142 self.kargs = kargs
144 # Initialize `self.dict_of_predicted_clusters`.
145 self.dict_of_predicted_clusters: Optional[Dict[str, int]] = None
147 # Initialize number of clusters attributes.
148 self.number_of_single_noise_point_clusters: int = 0
149 self.number_of_regular_clusters: int = 0
150 self.number_of_clusters: int = 0
152 # ==============================================================================
153 # MAIN - CLUSTER DATA
154 # ==============================================================================
155 def cluster(
156 self,
157 constraints_manager: AbstractConstraintsManager,
158 vectors: Dict[str, csr_matrix],
159 nb_clusters: Optional[int] = None,
160 verbose: bool = False,
161 **kargs,
162 ) -> Dict[str, int]:
163 """
164 The main method used to cluster data with the DBScan model.
166 Args:
167 constraints_manager (AbstractConstraintsManager): A constraints manager over data IDs that will force clustering to respect some conditions during computation.
168 vectors (Dict[str, csr_matrix]): The representation of data vectors. The keys of the dictionary represents the data IDs. This keys have to refer to the list of data IDs managed by the `constraints_manager`. The value of the dictionary represent the vector of each data.
169 nb_clusters (Optional[int]): The number of clusters to compute. Here `None`.
170 verbose (bool, optional): Enable verbose output. Defaults to `False`.
171 **kargs (dict): Other parameters that can be used in the clustering.
173 Raises:
174 ValueError: if `vectors` and `constraints_manager` are incompatible, or if some parameters are incorrectly set.
176 Returns:
177 Dict[str,int]: A dictionary that contains the predicted cluster for each data ID.
178 """
180 ###
181 ### GET PARAMETERS
182 ###
184 # Store `self.constraints_manager` and `self.list_of_data_IDs`.
185 if not isinstance(constraints_manager, AbstractConstraintsManager):
186 raise ValueError("The `constraints_manager` parameter has to be a `AbstractConstraintsManager` type.")
187 self.constraints_manager: AbstractConstraintsManager = constraints_manager
188 self.list_of_data_IDs: List[str] = self.constraints_manager.get_list_of_managed_data_IDs()
190 # Store `self.vectors`.
191 if not isinstance(vectors, dict):
192 raise ValueError("The `vectors` parameter has to be a `dict` type.")
193 self.vectors: Dict[str, csr_matrix] = vectors
195 # Store `self.nb_clusters`.
196 if nb_clusters is not None:
197 raise ValueError("The `nb_clusters` should be 'None' for DBScan clustering.")
198 self.nb_clusters: Optional[int] = None
200 ###
201 ### COMPUTE DISTANCE
202 ###
204 # Compute pairwise distances.
205 matrix_of_pairwise_distances: csr_matrix = pairwise_distances(
206 X=vstack(self.vectors[data_ID] for data_ID in self.constraints_manager.get_list_of_managed_data_IDs()),
207 metric="euclidean", # TODO get different pairwise_distances config in **kargs
208 )
210 # Format pairwise distances in a dictionary and store `self.dict_of_pairwise_distances`.
211 self.dict_of_pairwise_distances: Dict[str, Dict[str, float]] = {
212 vector_ID1: {
213 vector_ID2: float(matrix_of_pairwise_distances[i1, i2])
214 for i2, vector_ID2 in enumerate(self.constraints_manager.get_list_of_managed_data_IDs())
215 }
216 for i1, vector_ID1 in enumerate(self.constraints_manager.get_list_of_managed_data_IDs())
217 }
219 ###
220 ### INITIALIZE VARIABLES
221 ###
223 # Initialize `self.dict_of_predicted_clusters`.
224 self.dict_of_predicted_clusters = {}
226 # To assign "CORE", "SINGLE_CORE" or "NOISE" labels to the points
227 self.dict_of_data_IDs_labels: Dict[str, str] = {data_ID: "UNLABELED" for data_ID in self.list_of_data_IDs}
229 # To store the lists of points of each computed local cluster
230 self.dict_of_local_clusters: Dict[str, List[str]] = {}
232 # To store the lists of points of each computed core local cluster
233 self.dict_of_core_local_clusters: Dict[str, List[str]] = {data_ID: [] for data_ID in self.list_of_data_IDs}
235 ###
236 ### CREATE LOCAL CLUSTERS
237 ###
239 for possible_core_ID in self.list_of_data_IDs:
240 if self.dict_of_data_IDs_labels[possible_core_ID] != "SINGLE_CORE": 240 ↛ 239line 240 didn't jump to line 239, because the condition on line 240 was never false
241 # Points involved in a Cannot-link constraint are not associated to other points in this step
242 list_of_possible_neighbors: List[str] = [
243 neighbor_ID
244 for neighbor_ID in self.list_of_data_IDs
245 if self.dict_of_data_IDs_labels[neighbor_ID] != "SINGLE_CORE"
246 ]
248 # Compute distances to other possible neighbors
249 distances_to_possible_neighbors: Dict[str, float] = {
250 neighbor_ID: self.dict_of_pairwise_distances[possible_core_ID][neighbor_ID]
251 for neighbor_ID in list_of_possible_neighbors
252 }
254 # Keep only points within the radius of eps as neighbors
255 list_of_neighbors_ID: List[str] = [
256 neighbor_ID
257 for neighbor_ID in list_of_possible_neighbors
258 if distances_to_possible_neighbors[neighbor_ID] <= self.eps
259 ]
261 # Get the lists of not compatible data_IDs for deciding if the points are separated in different clusters
262 not_compatible_cluster_IDs: List[List[str]] = [
263 [
264 data_ID_i
265 for data_ID_i in list_of_neighbors_ID
266 if (
267 self.constraints_manager.get_inferred_constraint(
268 data_ID1=data_ID_j,
269 data_ID2=data_ID_i,
270 )
271 == "CANNOT_LINK"
272 )
273 ]
274 for data_ID_j in list_of_neighbors_ID
275 ]
277 # Check if there is a Cannot-link constraint between points in the neighborhood
278 no_conflict = True
279 for neighborhood_not_compatible_IDs in not_compatible_cluster_IDs:
280 if neighborhood_not_compatible_IDs: 280 ↛ 281line 280 didn't jump to line 281, because the condition on line 280 was never true
281 no_conflict = False
282 break
284 if len(list_of_neighbors_ID) < self.min_samples:
285 self.dict_of_data_IDs_labels[possible_core_ID] = "NOISE"
287 elif no_conflict is False: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true
288 for neighbor_ID in list_of_neighbors_ID:
289 # Each point of the neighborhood will be a single core point cluster
290 # and won't be involved in other clusters in this step
292 self.dict_of_data_IDs_labels[neighbor_ID] = "SINGLE_CORE"
293 self.dict_of_local_clusters[neighbor_ID] = [neighbor_ID]
295 else:
296 self.dict_of_data_IDs_labels[possible_core_ID] = "CORE"
297 self.dict_of_local_clusters[possible_core_ID] = list_of_neighbors_ID
299 ###
300 ### MERGE LOCAL CLUSTERS UNDER MUST-LINK CONSTRAINTS
301 ###
303 # Get the lists of data_IDs for which each point is in a Must-link constraint
304 compatible_cluster_IDs: Dict[str, List[str]] = {
305 data_ID_j: [
306 data_ID_i
307 for data_ID_i in self.list_of_data_IDs
308 if (
309 self.constraints_manager.get_inferred_constraint(
310 data_ID1=data_ID_j,
311 data_ID2=data_ID_i,
312 )
313 == "MUST_LINK"
314 )
315 ]
316 for data_ID_j in self.list_of_data_IDs
317 }
319 # Get the lists of local clusters where each point is in
320 clusters_of_data_IDs: Dict[str, List[str]] = {
321 data_ID_j: [
322 cluster_id
323 for cluster_id in self.dict_of_local_clusters.keys()
324 if (data_ID_j in self.dict_of_local_clusters[cluster_id])
325 ]
326 for data_ID_j in self.list_of_data_IDs
327 }
329 # Initialize a variable in order to analyze a point Must-link constraints only once
330 list_of_analyzed_IDs: List[str] = []
332 # Initialize a variable in order not to take one point into account in several core local clusters
333 dict_of_assigned_local_cluster: Dict[str, str] = {data_ID: "NONE" for data_ID in self.list_of_data_IDs}
335 for data_ID_i in self.list_of_data_IDs:
336 if data_ID_i not in list_of_analyzed_IDs: 336 ↛ 335line 336 didn't jump to line 335, because the condition on line 336 was never false
337 if compatible_cluster_IDs[data_ID_i]: 337 ↛ 414line 337 didn't jump to line 414, because the condition on line 337 was never false
338 # Choose a coherent ID of core local cluster corresponding to a local cluster ID of data_ID_i
340 # Initialize ID of the potential local cluster of data_ID_i and list of involved points
341 local_cluster_i_points: List[str] = []
343 if self.dict_of_data_IDs_labels[data_ID_i] == "NOISE":
344 data_ID_i_cluster = data_ID_i
345 local_cluster_i_points = [data_ID_i]
347 elif data_ID_i in self.dict_of_local_clusters.keys(): 347 ↛ 355line 347 didn't jump to line 355, because the condition on line 347 was never false
348 data_ID_i_cluster = data_ID_i
349 local_cluster_i_points = self.dict_of_local_clusters[data_ID_i]
351 else:
352 # Choose a local cluster ID where data_ID_i is in,
353 # and preferably a local cluster ID that is not already in a core local cluster
355 data_ID_i_cluster = clusters_of_data_IDs[data_ID_i][0]
356 for cluster_i_id in clusters_of_data_IDs[data_ID_i]:
357 if dict_of_assigned_local_cluster[data_ID_i] == "NONE":
358 data_ID_i_cluster = cluster_i_id
359 break
360 local_cluster_i_points = self.dict_of_local_clusters[data_ID_i_cluster]
362 for data_ID_j in compatible_cluster_IDs[data_ID_i]:
363 if self.dict_of_data_IDs_labels[data_ID_j] == "NOISE":
364 # Merge all the available points of the clusters involved in a Must-link constraint
366 list_of_core_cluster_points = []
367 for data_ID_k in local_cluster_i_points:
368 if dict_of_assigned_local_cluster[data_ID_k] == "NONE":
369 list_of_core_cluster_points.append(data_ID_k)
370 dict_of_assigned_local_cluster[data_ID_k] = data_ID_i_cluster
372 self.dict_of_core_local_clusters[data_ID_i_cluster] = list(
373 set(
374 self.dict_of_core_local_clusters[data_ID_i_cluster]
375 + list_of_core_cluster_points
376 + [data_ID_i, data_ID_j]
377 )
378 )
379 else:
380 # Initialize ID of the potential local cluster of data_ID_j and the list of involved points
381 local_cluster_j_points = []
383 if data_ID_j in self.dict_of_local_clusters.keys(): 383 ↛ 390line 383 didn't jump to line 390, because the condition on line 383 was never false
384 local_cluster_j_points = [data_ID_j]
386 else:
387 # Choose a local cluster ID where data_ID_j is in,
388 # and preferably a local cluster ID that is not already in a core local cluster
390 data_ID_j_cluster = clusters_of_data_IDs[data_ID_j][0]
391 for cluster_j_id in clusters_of_data_IDs[data_ID_j]:
392 if dict_of_assigned_local_cluster[data_ID_j] == "NONE":
393 data_ID_j_cluster = cluster_j_id
394 break
395 local_cluster_j_points = self.dict_of_local_clusters[data_ID_j_cluster]
397 # Merge all the available points of the clusters involved in a Must-link constraint
399 list_of_core_cluster_points = []
400 for data_ID_l in list(set(local_cluster_i_points + local_cluster_j_points)):
401 if dict_of_assigned_local_cluster[data_ID_l] == "NONE":
402 list_of_core_cluster_points.append(data_ID_l)
403 dict_of_assigned_local_cluster[data_ID_l] = data_ID_i_cluster
405 self.dict_of_core_local_clusters[data_ID_i_cluster] = list(
406 set(
407 self.dict_of_core_local_clusters[data_ID_i_cluster]
408 + list_of_core_cluster_points
409 + [data_ID_i, data_ID_j]
410 )
411 )
413 # Mark the current point as analyzed in order not to have it in two clusters
414 list_of_analyzed_IDs.append(data_ID_i)
416 # Clean the `dict_of_core_local_clusters` variable
417 for data_ID in self.list_of_data_IDs:
418 if not self.dict_of_core_local_clusters[data_ID]: 418 ↛ 420line 418 didn't jump to line 420, because the condition on line 418 was never true
419 # Clean by deleting non-existing core local clusters entries
420 self.dict_of_core_local_clusters.pop(data_ID)
421 elif dict_of_assigned_local_cluster[data_ID] != data_ID:
422 # Clean by deleting core local clusters entries corresponding to another already created core cluster
423 self.dict_of_core_local_clusters.pop(data_ID)
425 # Clean the `dict_of_core_local_clusters` variable by removing single-point clusters
426 # because don't make sense in a Must-link constraint
427 for potential_single_data_ID in self.list_of_data_IDs:
428 if (
429 potential_single_data_ID in self.dict_of_core_local_clusters.keys()
430 and len(self.dict_of_core_local_clusters[potential_single_data_ID]) < 2
431 ):
432 self.dict_of_core_local_clusters.pop(potential_single_data_ID)
434 ###
435 ### MERGE LOCAL CLUSTERS UNDER CANNOT-LINK CONSTRAINTS
436 ###
438 for core_cluster_ID in self.dict_of_core_local_clusters.keys():
439 merging = True
441 while merging and self.dict_of_local_clusters:
442 # While there is no conflict and there is still local clusters
444 distances_to_local_clusters: Dict[str, float] = {}
446 # Compute the distances between the core cluster and the local clusters
447 for local_cluster_ID in self.dict_of_local_clusters.keys():
448 # Compute the smallest distance between points of the core cluster and the local cluster
449 distances_to_local_clusters[local_cluster_ID] = min(
450 [
451 self.dict_of_pairwise_distances[core_cluster_pt][local_cluster_pt]
452 for core_cluster_pt in self.dict_of_core_local_clusters[core_cluster_ID]
453 for local_cluster_pt in self.dict_of_local_clusters[local_cluster_ID]
454 ]
455 )
457 # Find closest local cluster to core cluster
458 closest_cluster = min(
459 distances_to_local_clusters
460 ) # TODO: min(distances_to_local_clusters, key=lambda x: distances_to_local_clusters[x])
462 if distances_to_local_clusters[closest_cluster] > self.eps:
463 merging = False
465 else:
466 # Get the lists of not compatible data_IDs for deciding if clusters are merged
467 not_compatible_IDs: List[List[str]] = [
468 [
469 data_ID_m
470 for data_ID_m in self.dict_of_local_clusters[closest_cluster]
471 if (
472 self.constraints_manager.get_inferred_constraint(
473 data_ID1=data_ID_n,
474 data_ID2=data_ID_m,
475 )
476 == "CANNOT_LINK"
477 )
478 ]
479 for data_ID_n in self.dict_of_core_local_clusters[core_cluster_ID]
480 ]
482 # Check if there is a Cannot-link constraint between the points
483 no_conflict = True
484 for core_local_cluster_not_compatible_IDs in not_compatible_IDs:
485 if core_local_cluster_not_compatible_IDs: 485 ↛ 486line 485 didn't jump to line 486, because the condition on line 485 was never true
486 no_conflict = False
487 break
489 if no_conflict: 489 ↛ 501line 489 didn't jump to line 501, because the condition on line 489 was never false
490 # Merge core local cluster and its closest local cluster
491 self.dict_of_core_local_clusters[core_cluster_ID] = list(
492 set(
493 self.dict_of_core_local_clusters[core_cluster_ID]
494 + self.dict_of_local_clusters[closest_cluster]
495 )
496 )
498 self.dict_of_local_clusters.pop(closest_cluster)
500 else:
501 merging = False
503 ###
504 ### DEFINING FINAL CLUSTERS
505 ###
507 # Consider the final core local clusters
508 assigned_cluster_id: int = 0
509 for core_cluster in self.dict_of_core_local_clusters.keys():
510 for cluster_point in self.dict_of_core_local_clusters[core_cluster]:
511 self.dict_of_predicted_clusters[cluster_point] = assigned_cluster_id
512 assigned_cluster_id += 1
514 # Consider the remaining local clusters
515 for local_cluster in self.dict_of_local_clusters.keys():
516 # Remove points that already are in a final cluster
517 points_to_remove = []
518 for local_cluster_point in self.dict_of_local_clusters[local_cluster]:
519 if local_cluster_point in self.dict_of_predicted_clusters.keys(): 519 ↛ 518line 519 didn't jump to line 518, because the condition on line 519 was never false
520 points_to_remove.append(local_cluster_point)
521 for data_ID_to_remove in points_to_remove:
522 self.dict_of_local_clusters[local_cluster].remove(data_ID_to_remove)
524 # Check that the local cluster is still big enough
525 if len(self.dict_of_local_clusters[local_cluster]) >= self.eps: 525 ↛ 526line 525 didn't jump to line 526, because the condition on line 525 was never true
526 for core_cluster_point in self.dict_of_local_clusters[local_cluster]:
527 self.dict_of_predicted_clusters[core_cluster_point] = assigned_cluster_id
528 assigned_cluster_id += 1
530 # Rename clusters
531 self.dict_of_predicted_clusters = rename_clusters_by_order(
532 clusters=self.dict_of_predicted_clusters,
533 )
535 # Set number of regular clusters
536 self.number_of_regular_clusters = np.unique(np.array(list(self.dict_of_predicted_clusters.values()))).shape[0]
538 # Consider ignored points
539 ignored_cluster_id: int = -1
540 for potential_ignored_point in self.list_of_data_IDs:
541 if potential_ignored_point not in self.dict_of_predicted_clusters:
542 self.dict_of_predicted_clusters[potential_ignored_point] = ignored_cluster_id
543 ignored_cluster_id -= 1
545 # Set number of single ignored points cluster
546 self.number_of_single_noise_point_clusters = -(ignored_cluster_id + 1)
548 # Set total number of clusters
549 self.number_of_clusters = self.number_of_regular_clusters + self.number_of_single_noise_point_clusters
551 return self.dict_of_predicted_clusters