Coverage for src\cognitivefactory\interactive_clustering_gui\backgroundtasks.py: 100.00%
229 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-22 23:23 +0100
« prev ^ index » next coverage.py v7.4.3, created at 2024-03-22 23:23 +0100
1# -*- coding: utf-8 -*-
3"""
4* Name: cognitivefactory.interactive_clustering_gui.backgroundtasks
5* Description: Definition of bakgroundtasks for interactive clustering graphical user interface.
6* Author: Erwan Schild
7* Created: 22/10/2021
8* Licence: CeCILL-C License v1.0 (https://cecill.info/licences.fr.html)
9"""
12# ==============================================================================
13# IMPORT PYTHON DEPENDENCIES
14# ==============================================================================
16import json
17import os
18import pathlib
19import pickle # noqa: S403
20from typing import Any, Dict, List, Optional, Tuple
22from filelock import FileLock
23from numpy import ndarray
24from scipy.sparse import csr_matrix, vstack
25from sklearn.manifold import TSNE
27from cognitivefactory.interactive_clustering.clustering.abstract import AbstractConstrainedClustering
28from cognitivefactory.interactive_clustering.clustering.factory import clustering_factory
29from cognitivefactory.interactive_clustering.constraints.binary import BinaryConstraintsManager
30from cognitivefactory.interactive_clustering.sampling.abstract import AbstractConstraintsSampling
31from cognitivefactory.interactive_clustering.sampling.clusters_based import ClustersBasedConstraintsSampling
32from cognitivefactory.interactive_clustering.sampling.factory import sampling_factory
33from cognitivefactory.interactive_clustering.utils.preprocessing import preprocess
34from cognitivefactory.interactive_clustering.utils.vectorization import vectorize
35from cognitivefactory.interactive_clustering_gui.models.states import ICGUIStates
37# ==============================================================================
38# CONFIGURE FASTAPI APPLICATION
39# ==============================================================================
42# Define `DATA_DIRECTORY` (where data are stored).
43DATA_DIRECTORY = pathlib.Path(os.environ.get("DATA_DIRECTORY", ".data"))
44DATA_DIRECTORY.mkdir(parents=True, exist_ok=True)
47# ==============================================================================
48# DEFINE COMMON METHODS
49# ==============================================================================
52###
53### Utils: Get the list of existing project IDs.
54###
55def get_projects() -> List[str]:
56 """
57 Get the list of existing project IDs.
58 (A project is represented by a subfolder in `.data` folder.)
60 Returns:
61 List[str]: The list of existing project IDs.
62 """
64 # Return the list of project IDs.
65 return [project_id for project_id in os.listdir(DATA_DIRECTORY) if os.path.isdir(DATA_DIRECTORY / project_id)]
68###
69### Utils: Update project status during task.
70###
71def update_project_status(
72 project_id: str,
73 task_progression: Optional[int],
74 task_detail: Optional[str],
75 state: Optional[ICGUIStates] = None,
76) -> None:
77 """
78 Update project status during task.
80 Args:
81 project_id (str): The ID of the project.
82 task_progression (Optional[int]): The progression of the updated task.
83 task_detail (Optional[str]): The detail of the updated task.
84 state (Optional[ICGUIStates], optional): The state of the application. Unchanged if `None`. Defaults to `None`.
85 """
87 # Load status file.
88 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r:
89 project_status: Dict[str, Any] = json.load(status_fileobject_r)
91 # Update status.
92 project_status["task"] = (
93 {
94 "progression": task_progression,
95 "detail": task_detail,
96 }
97 if (task_progression is not None)
98 else None
99 )
100 project_status["state"] = project_status["state"] if (state is None) else state
102 # Store status.
103 with open(DATA_DIRECTORY / project_id / "status.json", "w") as status_fileobject_w:
104 json.dump(
105 project_status,
106 status_fileobject_w,
107 indent=4,
108 )
111# ==============================================================================
112# DEFINE BACKGROUND TASKS FOR MODELIZATION UPDATE
113# ==============================================================================
116###
117### BACKGROUND TASK: Run modelization update task.
118###
119def run_modelization_update_task(
120 project_id: str,
121) -> None:
122 """
123 Background task route for modelization update.
124 It performs the following actions : texts propressing, texts vectorization, constraints manager update.
125 Emit message to share progress, raise error and announce success.
127 Args:
128 project_id (str): The ID of the project.
129 """
131 ###
132 ### Check parameters.
133 ###
135 # Check project id : Case of unknown.
136 if project_id not in get_projects():
137 return
139 # Lock status file in order to check project status for this step.
140 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
141 ###
142 ### Load needed data.
143 ###
145 # Load status file.
146 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r:
147 project_status: Dict[str, Any] = json.load(status_fileobject_r)
149 ###
150 ### Check parameters.
151 ###
153 # Check project status.
154 working_state: Optional[ICGUIStates] = None
155 if project_status["state"] == ICGUIStates.INITIALIZATION_WITH_PENDING_MODELIZATION:
156 working_state = ICGUIStates.INITIALIZATION_WITH_WORKING_MODELIZATION
157 elif project_status["state"] == ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_PENDING_MODELIZATION:
158 working_state = ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_WORKING_MODELIZATION
159 elif project_status["state"] == ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_PENDING_MODELIZATION:
160 working_state = ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_WORKING_MODELIZATION
161 elif project_status["state"] == ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_PENDING_MODELIZATION:
162 working_state = ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_WORKING_MODELIZATION
163 elif project_status["state"] == ICGUIStates.IMPORT_AT_ITERATION_END_WITH_PENDING_MODELIZATION:
164 working_state = ICGUIStates.IMPORT_AT_ITERATION_END_WITH_WORKING_MODELIZATION
165 elif project_status["state"] == ICGUIStates.ANNOTATION_WITH_PENDING_MODELIZATION_WITHOUT_CONFLICTS:
166 working_state = ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITHOUT_CONFLICTS
167 elif project_status["state"] == ICGUIStates.ANNOTATION_WITH_PENDING_MODELIZATION_WITH_CONFLICTS:
168 working_state = ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITH_CONFLICTS
169 else:
170 return
172 # Update status.
173 update_project_status(
174 project_id=project_id,
175 task_progression=5,
176 task_detail="Lock the project for modelization update step.",
177 state=working_state,
178 )
180 # Get current iteration.
181 iteration_id: int = project_status["iteration_id"]
183 ###
184 ### Settings loading.
185 ###
186 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
187 update_project_status(
188 project_id=project_id,
189 task_progression=10,
190 task_detail="Load settings.",
191 )
193 # Load settings file.
194 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject:
195 settings: Dict[str, Any] = json.load(settings_fileobject)
197 ###
198 ### Texts loading.
199 ###
200 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
201 update_project_status(
202 project_id=project_id,
203 task_progression=15,
204 task_detail="Load texts.",
205 )
207 # Load texts
208 with open(DATA_DIRECTORY / project_id / "texts.json", "r") as texts_fileobject_r:
209 texts: Dict[str, Any] = json.load(texts_fileobject_r)
211 ###
212 ### Texts preprocessing.
213 ###
214 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
215 update_project_status(
216 project_id=project_id,
217 task_progression=20,
218 task_detail="Preprocess texts.",
219 )
221 # Get all unpreprocessed texts.
222 dict_of_unpreprocessed_texts: Dict[str, str] = {
223 text_id_before_preprocessing: text_value_before_preprocessing["text"]
224 for text_id_before_preprocessing, text_value_before_preprocessing in texts.items()
225 }
227 # Preprocess all texts (even if text is deleted).
228 dict_of_preprocessed_texts: Dict[str, str] = preprocess(
229 dict_of_texts=dict_of_unpreprocessed_texts,
230 apply_stopwords_deletion=settings[str(iteration_id)]["preprocessing"]["apply_stopwords_deletion"],
231 apply_parsing_filter=settings[str(iteration_id)]["preprocessing"]["apply_parsing_filter"],
232 apply_lemmatization=settings[str(iteration_id)]["preprocessing"]["apply_lemmatization"],
233 spacy_language_model=settings[str(iteration_id)]["preprocessing"]["spacy_language_model"],
234 )
236 # Update texts with preprocessed values.
237 for text_id_with_preprocessing in texts.keys():
238 texts[text_id_with_preprocessing]["text_preprocessed"] = dict_of_preprocessed_texts[text_id_with_preprocessing]
240 # Store texts.
241 with open(DATA_DIRECTORY / project_id / "texts.json", "w") as texts_fileobject_w:
242 json.dump(
243 texts,
244 texts_fileobject_w,
245 indent=4,
246 )
248 ###
249 ### Texts vectorization.
250 ###
251 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
252 update_project_status(
253 project_id=project_id,
254 task_progression=35,
255 task_detail="Vectorize texts.",
256 )
258 # Get managed preprocessed texts.
259 dict_of_managed_preprocessed_texts: Dict[str, str] = {
260 text_id_before_vectorization: text_value_before_vectorization["text_preprocessed"]
261 for text_id_before_vectorization, text_value_before_vectorization in texts.items()
262 if text_value_before_vectorization["is_deleted"] is False
263 }
265 # Vectorize texts (only if text is not deleted).
266 dict_of_managed_vectors: Dict[str, csr_matrix] = vectorize(
267 dict_of_texts=dict_of_managed_preprocessed_texts,
268 vectorizer_type=settings[str(iteration_id)]["vectorization"]["vectorizer_type"],
269 spacy_language_model=settings[str(iteration_id)]["vectorization"]["spacy_language_model"],
270 )
272 # Store vectors.
273 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "wb") as vectors_fileobject:
274 pickle.dump(
275 dict_of_managed_vectors,
276 vectors_fileobject,
277 pickle.HIGHEST_PROTOCOL,
278 )
280 # Convert vectors into matrix.
281 vectors_ND: csr_matrix = vstack(
282 dict_of_managed_vectors[text_id_with_ND] for text_id_with_ND in dict_of_managed_vectors.keys()
283 )
285 ###
286 ### Texts vectorization in 2D.
287 ###
288 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
289 update_project_status(
290 project_id=project_id,
291 task_progression=50,
292 task_detail="Reduce vectors to 2 dimensions.",
293 )
295 # Reduce vectors to 2 dimensions with TSNE.
296 vectors_2D: ndarray = TSNE(
297 n_components=2,
298 # learning_rate="auto", # Error on "scikit-learn==0.24.1" !
299 init="random",
300 random_state=settings[str(iteration_id)]["vectorization"]["random_seed"],
301 perplexity=min(30.0, vectors_ND.shape[0] - 1), # TSNE requirement.
302 ).fit_transform(vectors_ND)
304 # Store 2D vectors.
305 with open(DATA_DIRECTORY / project_id / "vectors_2D.json", "w") as vectors_2D_fileobject:
306 json.dump(
307 {
308 text_id_with_2D: {
309 "x": float(vectors_2D[i_2D][0]),
310 "y": float(vectors_2D[i_2D][1]),
311 }
312 for i_2D, text_id_with_2D in enumerate(dict_of_managed_vectors.keys())
313 },
314 vectors_2D_fileobject,
315 indent=4,
316 )
318 ###
319 ### Texts vectorization in 3D.
320 ###
321 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
322 update_project_status(
323 project_id=project_id,
324 task_progression=65,
325 task_detail="Reduce vectors to 3 dimensions.",
326 )
328 # Reduce vectors to 3 dimensions with TSNE.
329 vectors_3D: ndarray = TSNE(
330 n_components=3,
331 # learning_rate="auto", # Error on "scikit-learn==0.24.1" !
332 init="random",
333 random_state=settings[str(iteration_id)]["vectorization"]["random_seed"],
334 perplexity=min(30.0, vectors_ND.shape[0] - 1), # TSNE requirement.
335 ).fit_transform(vectors_ND)
337 # Store 3D vectors.
338 with open(DATA_DIRECTORY / project_id / "vectors_3D.json", "w") as vectors_3D_fileobject:
339 json.dump(
340 {
341 text_id_with_3D: {
342 "x": float(vectors_3D[i_3D][0]),
343 "y": float(vectors_3D[i_3D][1]),
344 "z": float(vectors_3D[i_3D][2]),
345 }
346 for i_3D, text_id_with_3D in enumerate(dict_of_managed_vectors.keys())
347 },
348 vectors_3D_fileobject,
349 indent=4,
350 )
352 ###
353 ### Constraints manager regeneration.
354 ###
355 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
356 update_project_status(
357 project_id=project_id,
358 task_progression=80,
359 task_detail="(Re)generate constraints manager.",
360 )
362 # Initialize constraints manager with managed texts IDs.
363 new_constraints_manager: BinaryConstraintsManager = BinaryConstraintsManager(
364 list_of_data_IDs=list(dict_of_managed_preprocessed_texts.keys())
365 )
367 # Load annotated constraints.
368 with open(DATA_DIRECTORY / project_id / "constraints.json", "r") as constraints_fileobject_r:
369 constraints: Dict[str, Any] = json.load(constraints_fileobject_r)
371 # First, reset `to_fix_conflict` status of all constraints.
372 for constraint_id in constraints.keys():
373 constraints[constraint_id]["to_fix_conflict"] = False
375 # Then, update constraints manager with "CANNOT_LINK" constraints.
376 for _, constraint_CL in constraints.items():
377 if constraint_CL["constraint_type"] == "CANNOT_LINK" and constraint_CL["is_hidden"] is False:
378 new_constraints_manager.add_constraint(
379 data_ID1=constraint_CL["data"]["id_1"],
380 data_ID2=constraint_CL["data"]["id_2"],
381 constraint_type="CANNOT_LINK",
382 ) # No conflict can append, at this step the constraints manager handle only constraints of same type.
384 # Initialize conflicts counter.
385 number_of_conflicts: int = 0
387 # Finaly, update constraints manager with "MUST_LINK" constraints.
388 for constraint_ML_id, constraint_ML in constraints.items():
389 if constraint_ML["constraint_type"] == "MUST_LINK" and constraint_ML["is_hidden"] is False:
390 try:
391 new_constraints_manager.add_constraint(
392 data_ID1=constraint_ML["data"]["id_1"],
393 data_ID2=constraint_ML["data"]["id_2"],
394 constraint_type="MUST_LINK",
395 ) # Conflicts can append.
396 except ValueError:
397 # Update conflicts status.
398 constraints[constraint_ML_id]["to_fix_conflict"] = True
399 number_of_conflicts += 1
401 # Store new constraints manager.
402 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "wb") as constraints_manager_fileobject:
403 pickle.dump(
404 new_constraints_manager,
405 constraints_manager_fileobject,
406 pickle.HIGHEST_PROTOCOL,
407 )
409 # Store updated constraints in file.
410 with open(DATA_DIRECTORY / project_id / "constraints.json", "w") as constraints_fileobject_w:
411 json.dump(constraints, constraints_fileobject_w, indent=4)
413 ###
414 ### Store modelization inference.
415 ###
416 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
417 update_project_status(
418 project_id=project_id,
419 task_progression=95,
420 task_detail="Store modelization inference results.",
421 )
423 # Load modelization inference file.
424 with open(DATA_DIRECTORY / project_id / "modelization.json", "r") as modelization_fileobject_r:
425 modelization: Dict[str, Any] = json.load(modelization_fileobject_r)
427 # Get constraints transitivity.
428 constraints_transitivity: Dict[str, Dict[str, Dict[str, None]]] = (
429 new_constraints_manager._constraints_transitivity # noqa: WPS437
430 )
432 # Update modelization inference.
433 modelization = {}
434 for text_id_in_manager in new_constraints_manager.get_list_of_managed_data_IDs():
435 modelization[text_id_in_manager] = {
436 "MUST_LINK": list(constraints_transitivity[text_id_in_manager]["MUST_LINK"].keys()),
437 "CANNOT_LINK": list(constraints_transitivity[text_id_in_manager]["CANNOT_LINK"].keys()),
438 }
439 for component_id, component in enumerate(new_constraints_manager.get_connected_components()):
440 for text_id_in_component in component:
441 modelization[text_id_in_component]["COMPONENT"] = component_id
443 # Store updated modelization inference in file.
444 with open(DATA_DIRECTORY / project_id / "modelization.json", "w") as modelization_fileobject_w:
445 json.dump(modelization, modelization_fileobject_w, indent=4)
447 ###
448 ### End of task.
449 ###
451 # Define the next state.
452 end_state: Optional[ICGUIStates] = None
453 if working_state == ICGUIStates.INITIALIZATION_WITH_WORKING_MODELIZATION:
454 end_state = (
455 ICGUIStates.CLUSTERING_TODO if (number_of_conflicts == 0) else ICGUIStates.INITIALIZATION_WITH_ERRORS
456 )
457 elif working_state == ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_WORKING_MODELIZATION:
458 end_state = (
459 ICGUIStates.SAMPLING_TODO if (number_of_conflicts == 0) else ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_ERRORS
460 )
461 elif working_state == ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_WORKING_MODELIZATION:
462 end_state = (
463 ICGUIStates.CLUSTERING_TODO
464 if (number_of_conflicts == 0)
465 else ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_ERRORS
466 )
467 elif working_state == ICGUIStates.IMPORT_AT_ITERATION_END_WITH_WORKING_MODELIZATION:
468 end_state = (
469 ICGUIStates.ITERATION_END if (number_of_conflicts == 0) else ICGUIStates.IMPORT_AT_ITERATION_END_WITH_ERRORS
470 )
471 #### elif working_state in {
472 #### ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_WORKING_MODELIZATION,
473 #### ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITHOUT_CONFLICTS,
474 #### ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITH_CONFLICTS,
475 #### }:
476 else:
477 end_state = (
478 ICGUIStates.ANNOTATION_WITH_UPTODATE_MODELIZATION
479 if (number_of_conflicts == 0)
480 else ICGUIStates.ANNOTATION_WITH_OUTDATED_MODELIZATION_WITH_CONFLICTS
481 )
483 # Lock status file in order to update project status.
484 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
485 update_project_status(
486 project_id=project_id,
487 task_progression=None,
488 task_detail=None,
489 state=end_state,
490 )
493# ==============================================================================
494# DEFINE BACKGROUND TASKS FOR CONSTRAINTS SAMPLING
495# ==============================================================================
498###
499### BACKGROUND TASK: Run constraints sampling task.
500###
501def run_constraints_sampling_task(
502 project_id: str,
503) -> None:
504 """
505 Background task route for constraints sampling task.
506 It performs the following actions : constraints sampling.
508 Args:
509 project_id (str): The ID of the project.
510 """
512 ###
513 ### Check parameters.
514 ###
516 # Check project id : Case of unknown.
517 if project_id not in get_projects():
518 return
520 # Lock status file in order to check project iteration and project status for this step.
521 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
522 # Load status file.
523 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r:
524 project_status: Dict[str, Any] = json.load(status_fileobject_r)
526 # Check project status.
527 if project_status["state"] != ICGUIStates.SAMPLING_PENDING:
528 return
530 # Update status.
531 update_project_status(
532 project_id=project_id,
533 task_progression=5,
534 task_detail="Lock the project for constraints sampling step.",
535 state=ICGUIStates.SAMPLING_WORKING,
536 )
538 # Get current iteration.
539 iteration_id: int = project_status["iteration_id"]
541 ###
542 ### Settings loading.
543 ###
544 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
545 update_project_status(
546 project_id=project_id,
547 task_progression=10,
548 task_detail="Load settings.",
549 )
551 # Load settings file.
552 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject:
553 settings: Dict[str, Any] = json.load(settings_fileobject)
555 # Get previous iteration id.
556 previous_iteration_id: int = iteration_id - 1
558 ###
559 ### Constraints manager loading.
560 ###
561 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
562 update_project_status(
563 project_id=project_id,
564 task_progression=20,
565 task_detail="Load constraints manager.",
566 )
568 # Load constraints manager.
569 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "rb") as constraints_manager_fileobject:
570 constraints_manager: BinaryConstraintsManager = pickle.load( # noqa: S301 # Usage of Pickle
571 constraints_manager_fileobject
572 )
574 ###
575 ### Clustering results loading.
576 ###
577 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
578 update_project_status(
579 project_id=project_id,
580 task_progression=30,
581 task_detail="Load previous clustering results.",
582 )
584 # Get previous clustering result.
585 with open(DATA_DIRECTORY / project_id / "clustering.json", "r") as clustering_fileobject:
586 clustering_results_for_previous_iteration: Dict[str, int] = json.load(clustering_fileobject)[
587 str(previous_iteration_id)
588 ]
590 ###
591 ### Vectors loading.
592 ###
593 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
594 update_project_status(
595 project_id=project_id,
596 task_progression=40,
597 task_detail="Load vectors.",
598 )
600 # Load vectors.
601 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "rb") as vectors_fileobject:
602 dict_of_managed_vectors: Dict[str, csr_matrix] = pickle.load( # noqa: S301 # Usage of Pickle
603 vectors_fileobject
604 )
606 ###
607 ### Constraints sampling initialization.
608 ###
609 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
610 update_project_status(
611 project_id=project_id,
612 task_progression=50,
613 task_detail="Initialize constraints sampler.",
614 )
616 # Initialize constraints sampler.
617 kwargs_sampling_init: Dict[str, Any] = (
618 settings[str(iteration_id)]["sampling"]["init_kargs"]
619 if (settings[str(iteration_id)]["sampling"]["init_kargs"] is not None)
620 else {}
621 )
622 sampler: AbstractConstraintsSampling = (
623 ClustersBasedConstraintsSampling(**kwargs_sampling_init)
624 if (settings[str(iteration_id)]["sampling"]["algorithm"] == "custom")
625 else sampling_factory(
626 algorithm=settings[str(iteration_id)]["sampling"]["algorithm"],
627 random_seed=settings[str(iteration_id)]["sampling"]["random_seed"],
628 **kwargs_sampling_init,
629 )
630 )
632 ###
633 ### Constraints sampling.
634 ###
635 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
636 update_project_status(
637 project_id=project_id,
638 task_progression=60,
639 task_detail="Sample {nb} pairs of texts to annotate.".format(
640 nb=str(settings[str(iteration_id)]["sampling"]["nb_to_select"])
641 ),
642 )
644 # Sample pairs of data to annotate.
645 sampling_result: List[Tuple[str, str]] = sampler.sample(
646 constraints_manager=constraints_manager,
647 nb_to_select=settings[str(iteration_id)]["sampling"]["nb_to_select"],
648 clustering_result=clustering_results_for_previous_iteration,
649 vectors=dict_of_managed_vectors,
650 )
652 # If needed: complete with some random pairs of data IDs.
653 if len(sampling_result) < settings[str(iteration_id)]["sampling"]["nb_to_select"]:
654 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
655 update_project_status(
656 project_id=project_id,
657 task_progression=75,
658 task_detail="Need to complete with {nb} random pairs of texts.".format(
659 nb=str(settings[str(iteration_id)]["sampling"]["nb_to_select"] - len(sampling_result))
660 ),
661 )
662 sampling_result += [
663 random_sample
664 for random_sample in sampling_factory(
665 algorithm="random",
666 random_seed=settings[str(iteration_id)]["sampling"]["random_seed"],
667 ).sample(
668 constraints_manager=constraints_manager,
669 nb_to_select=settings[str(iteration_id)]["sampling"]["nb_to_select"] - len(sampling_result),
670 )
671 if random_sample not in sampling_result
672 ]
674 ###
675 ### Sampling results storage.
676 ###
677 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
678 update_project_status(
679 project_id=project_id,
680 task_progression=90,
681 task_detail="Store sampling results and prepapre annotations.",
682 )
684 # Load sampling results file.
685 with open(DATA_DIRECTORY / project_id / "sampling.json", "r") as sampling_fileobject_r:
686 sampling_results: Dict[str, List[str]] = json.load(sampling_fileobject_r)
688 # Load constraints file.
689 with open(DATA_DIRECTORY / project_id / "constraints.json", "r") as constraints_fileobject_r:
690 constraints: Dict[str, Dict[str, Any]] = json.load(constraints_fileobject_r)
692 # Initialize sampling result for this iteration.
693 sampling_results[str(iteration_id)] = []
695 # For all sampling to annotate...
696 for data_ID1, data_ID2 in sampling_result:
697 # Define sampling id.
698 constraint_id: str = "({data_ID1_str},{data_ID2_str})".format(
699 data_ID1_str=data_ID1,
700 data_ID2_str=data_ID2,
701 )
703 # Add sampling id.
704 sampling_results[str(iteration_id)].append(constraint_id)
706 # Update constraints if not already known.
707 if constraint_id not in constraints.keys():
708 constraints[constraint_id] = {
709 "data": {
710 "id_1": data_ID1,
711 "id_2": data_ID2,
712 },
713 "constraint_type": None,
714 "constraint_type_previous": [],
715 "is_hidden": False, # if text is deleted.
716 "to_annotate": False,
717 "to_review": False,
718 "to_fix_conflict": False,
719 "comment": "",
720 "date_of_update": None,
721 "iteration_of_sampling": iteration_id,
722 }
723 constraints[constraint_id]["to_annotate"] = True
725 # Store sampling results.
726 with open(DATA_DIRECTORY / project_id / "sampling.json", "w") as sampling_fileobject_w:
727 json.dump(
728 sampling_results,
729 sampling_fileobject_w,
730 indent=4,
731 )
733 # Store constraints results.
734 with open(DATA_DIRECTORY / project_id / "constraints.json", "w") as constraints_fileobject_w:
735 json.dump(
736 constraints,
737 constraints_fileobject_w,
738 indent=4,
739 )
741 ###
742 ### End of task.
743 ###
745 # Lock status file in order to update project status.
746 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
747 update_project_status(
748 project_id=project_id,
749 task_progression=None,
750 task_detail=None,
751 state=ICGUIStates.ANNOTATION_WITH_UPTODATE_MODELIZATION,
752 )
755# ==============================================================================
756# DEFINE BACKGROUND TASKS FOR CONSTRAINED CLUSTERING
757# ==============================================================================
760###
761### BACKGROUND TASK: Run constraints clustering task.
762###
763def run_constrained_clustering_task(
764 project_id: str,
765) -> None:
766 """
767 Background task for constraints clustering task.
768 It performs the following actions : constrained clustering.
770 Args:
771 project_id (str): The ID of the project.
772 """
774 ###
775 ### Check parameters.
776 ###
778 # Check project id : Case of unknown.
779 if project_id not in get_projects():
780 return
782 # Lock status file in order to check project status for this step.
783 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
784 # Load status file.
785 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r:
786 project_status: Dict[str, Any] = json.load(status_fileobject_r)
788 # Check project status.
789 if project_status["state"] != ICGUIStates.CLUSTERING_PENDING:
790 return
792 # Update status.
793 update_project_status(
794 project_id=project_id,
795 task_progression=5,
796 task_detail="Lock the project for constrained clustering step.",
797 state=ICGUIStates.CLUSTERING_WORKING,
798 )
800 # Get current iteration.
801 iteration_id: int = project_status["iteration_id"]
803 ###
804 ### Settings loading.
805 ###
806 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
807 update_project_status(
808 project_id=project_id,
809 task_progression=10,
810 task_detail="Load settings.",
811 )
813 # Load settings file.
814 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject:
815 settings: Dict[str, Any] = json.load(settings_fileobject)
817 ###
818 ### Constraints manager loading.
819 ###
820 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
821 update_project_status(
822 project_id=project_id,
823 task_progression=20,
824 task_detail="Load constraints manager.",
825 )
827 # Load constraints manager.
828 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "rb") as constraints_manager_fileobject:
829 constraints_manager: BinaryConstraintsManager = pickle.load( # noqa: S301 # Usage of Pickle
830 constraints_manager_fileobject
831 )
833 ###
834 ### Vectors loading.
835 ###
836 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
837 update_project_status(
838 project_id=project_id,
839 task_progression=30,
840 task_detail="Load vectors.",
841 )
843 # Load vectors.
844 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "rb") as vectors_fileobject:
845 dict_of_managed_vectors: Dict[str, csr_matrix] = pickle.load( # noqa: S301 # Usage of Pickle
846 vectors_fileobject
847 )
849 ###
850 ### Clustering model initialization.
851 ###
852 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
853 update_project_status(
854 project_id=project_id,
855 task_progression=40,
856 task_detail="Initialize clustering model.",
857 )
859 # Initialize clustering model.
860 kwargs_clustering_init: Dict[str, Any] = (
861 settings[str(iteration_id)]["clustering"]["init_kargs"]
862 if (settings[str(iteration_id)]["clustering"]["init_kargs"] is not None)
863 else {}
864 )
865 clustering_model: AbstractConstrainedClustering = clustering_factory(
866 algorithm=settings[str(iteration_id)]["clustering"]["algorithm"],
867 random_seed=settings[str(iteration_id)]["clustering"]["random_seed"],
868 **kwargs_clustering_init,
869 )
871 ###
872 ### Constrained clustering.
873 ###
874 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
875 update_project_status(
876 project_id=project_id,
877 task_progression=50,
878 task_detail="Run constrained clustering.",
879 )
881 # Run constrained clustering.
882 clustering_result: Dict[str, int] = clustering_model.cluster(
883 constraints_manager=constraints_manager,
884 vectors=dict_of_managed_vectors,
885 nb_clusters=settings[str(iteration_id)]["clustering"]["nb_clusters"],
886 )
888 ###
889 ### Clustering results storage.
890 ###
891 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
892 update_project_status(
893 project_id=project_id,
894 task_progression=90,
895 task_detail="Store clustering results.",
896 )
898 # Load clustering results file.
899 with open(DATA_DIRECTORY / project_id / "clustering.json", "r") as clustering_fileobject_r:
900 history_of_clustering_results: Dict[str, Dict[str, int]] = json.load(clustering_fileobject_r)
902 # Update clustering results.
903 history_of_clustering_results[str(iteration_id)] = clustering_result
905 # Store clustering results.
906 with open(DATA_DIRECTORY / project_id / "clustering.json", "w") as clustering_fileobject_w:
907 json.dump(
908 history_of_clustering_results,
909 clustering_fileobject_w,
910 indent=4,
911 )
913 ###
914 ### End of task.
915 ###
917 # Lock status file in order to update project status.
918 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")):
919 update_project_status(
920 project_id=project_id,
921 task_progression=None,
922 task_detail=None,
923 state=ICGUIStates.ITERATION_END,
924 )