Coverage for src\cognitivefactory\interactive_clustering_gui\backgroundtasks.py: 100.00%

229 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-03-22 23:23 +0100

1# -*- coding: utf-8 -*- 

2 

3""" 

4* Name: cognitivefactory.interactive_clustering_gui.backgroundtasks 

5* Description: Definition of bakgroundtasks for interactive clustering graphical user interface. 

6* Author: Erwan Schild 

7* Created: 22/10/2021 

8* Licence: CeCILL-C License v1.0 (https://cecill.info/licences.fr.html) 

9""" 

10 

11 

12# ============================================================================== 

13# IMPORT PYTHON DEPENDENCIES 

14# ============================================================================== 

15 

16import json 

17import os 

18import pathlib 

19import pickle # noqa: S403 

20from typing import Any, Dict, List, Optional, Tuple 

21 

22from filelock import FileLock 

23from numpy import ndarray 

24from scipy.sparse import csr_matrix, vstack 

25from sklearn.manifold import TSNE 

26 

27from cognitivefactory.interactive_clustering.clustering.abstract import AbstractConstrainedClustering 

28from cognitivefactory.interactive_clustering.clustering.factory import clustering_factory 

29from cognitivefactory.interactive_clustering.constraints.binary import BinaryConstraintsManager 

30from cognitivefactory.interactive_clustering.sampling.abstract import AbstractConstraintsSampling 

31from cognitivefactory.interactive_clustering.sampling.clusters_based import ClustersBasedConstraintsSampling 

32from cognitivefactory.interactive_clustering.sampling.factory import sampling_factory 

33from cognitivefactory.interactive_clustering.utils.preprocessing import preprocess 

34from cognitivefactory.interactive_clustering.utils.vectorization import vectorize 

35from cognitivefactory.interactive_clustering_gui.models.states import ICGUIStates 

36 

37# ============================================================================== 

38# CONFIGURE FASTAPI APPLICATION 

39# ============================================================================== 

40 

41 

42# Define `DATA_DIRECTORY` (where data are stored). 

43DATA_DIRECTORY = pathlib.Path(os.environ.get("DATA_DIRECTORY", ".data")) 

44DATA_DIRECTORY.mkdir(parents=True, exist_ok=True) 

45 

46 

47# ============================================================================== 

48# DEFINE COMMON METHODS 

49# ============================================================================== 

50 

51 

52### 

53### Utils: Get the list of existing project IDs. 

54### 

55def get_projects() -> List[str]: 

56 """ 

57 Get the list of existing project IDs. 

58 (A project is represented by a subfolder in `.data` folder.) 

59 

60 Returns: 

61 List[str]: The list of existing project IDs. 

62 """ 

63 

64 # Return the list of project IDs. 

65 return [project_id for project_id in os.listdir(DATA_DIRECTORY) if os.path.isdir(DATA_DIRECTORY / project_id)] 

66 

67 

68### 

69### Utils: Update project status during task. 

70### 

71def update_project_status( 

72 project_id: str, 

73 task_progression: Optional[int], 

74 task_detail: Optional[str], 

75 state: Optional[ICGUIStates] = None, 

76) -> None: 

77 """ 

78 Update project status during task. 

79 

80 Args: 

81 project_id (str): The ID of the project. 

82 task_progression (Optional[int]): The progression of the updated task. 

83 task_detail (Optional[str]): The detail of the updated task. 

84 state (Optional[ICGUIStates], optional): The state of the application. Unchanged if `None`. Defaults to `None`. 

85 """ 

86 

87 # Load status file. 

88 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r: 

89 project_status: Dict[str, Any] = json.load(status_fileobject_r) 

90 

91 # Update status. 

92 project_status["task"] = ( 

93 { 

94 "progression": task_progression, 

95 "detail": task_detail, 

96 } 

97 if (task_progression is not None) 

98 else None 

99 ) 

100 project_status["state"] = project_status["state"] if (state is None) else state 

101 

102 # Store status. 

103 with open(DATA_DIRECTORY / project_id / "status.json", "w") as status_fileobject_w: 

104 json.dump( 

105 project_status, 

106 status_fileobject_w, 

107 indent=4, 

108 ) 

109 

110 

111# ============================================================================== 

112# DEFINE BACKGROUND TASKS FOR MODELIZATION UPDATE 

113# ============================================================================== 

114 

115 

116### 

117### BACKGROUND TASK: Run modelization update task. 

118### 

119def run_modelization_update_task( 

120 project_id: str, 

121) -> None: 

122 """ 

123 Background task route for modelization update. 

124 It performs the following actions : texts propressing, texts vectorization, constraints manager update. 

125 Emit message to share progress, raise error and announce success. 

126 

127 Args: 

128 project_id (str): The ID of the project. 

129 """ 

130 

131 ### 

132 ### Check parameters. 

133 ### 

134 

135 # Check project id : Case of unknown. 

136 if project_id not in get_projects(): 

137 return 

138 

139 # Lock status file in order to check project status for this step. 

140 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

141 ### 

142 ### Load needed data. 

143 ### 

144 

145 # Load status file. 

146 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r: 

147 project_status: Dict[str, Any] = json.load(status_fileobject_r) 

148 

149 ### 

150 ### Check parameters. 

151 ### 

152 

153 # Check project status. 

154 working_state: Optional[ICGUIStates] = None 

155 if project_status["state"] == ICGUIStates.INITIALIZATION_WITH_PENDING_MODELIZATION: 

156 working_state = ICGUIStates.INITIALIZATION_WITH_WORKING_MODELIZATION 

157 elif project_status["state"] == ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_PENDING_MODELIZATION: 

158 working_state = ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_WORKING_MODELIZATION 

159 elif project_status["state"] == ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_PENDING_MODELIZATION: 

160 working_state = ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_WORKING_MODELIZATION 

161 elif project_status["state"] == ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_PENDING_MODELIZATION: 

162 working_state = ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_WORKING_MODELIZATION 

163 elif project_status["state"] == ICGUIStates.IMPORT_AT_ITERATION_END_WITH_PENDING_MODELIZATION: 

164 working_state = ICGUIStates.IMPORT_AT_ITERATION_END_WITH_WORKING_MODELIZATION 

165 elif project_status["state"] == ICGUIStates.ANNOTATION_WITH_PENDING_MODELIZATION_WITHOUT_CONFLICTS: 

166 working_state = ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITHOUT_CONFLICTS 

167 elif project_status["state"] == ICGUIStates.ANNOTATION_WITH_PENDING_MODELIZATION_WITH_CONFLICTS: 

168 working_state = ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITH_CONFLICTS 

169 else: 

170 return 

171 

172 # Update status. 

173 update_project_status( 

174 project_id=project_id, 

175 task_progression=5, 

176 task_detail="Lock the project for modelization update step.", 

177 state=working_state, 

178 ) 

179 

180 # Get current iteration. 

181 iteration_id: int = project_status["iteration_id"] 

182 

183 ### 

184 ### Settings loading. 

185 ### 

186 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

187 update_project_status( 

188 project_id=project_id, 

189 task_progression=10, 

190 task_detail="Load settings.", 

191 ) 

192 

193 # Load settings file. 

194 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject: 

195 settings: Dict[str, Any] = json.load(settings_fileobject) 

196 

197 ### 

198 ### Texts loading. 

199 ### 

200 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

201 update_project_status( 

202 project_id=project_id, 

203 task_progression=15, 

204 task_detail="Load texts.", 

205 ) 

206 

207 # Load texts 

208 with open(DATA_DIRECTORY / project_id / "texts.json", "r") as texts_fileobject_r: 

209 texts: Dict[str, Any] = json.load(texts_fileobject_r) 

210 

211 ### 

212 ### Texts preprocessing. 

213 ### 

214 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

215 update_project_status( 

216 project_id=project_id, 

217 task_progression=20, 

218 task_detail="Preprocess texts.", 

219 ) 

220 

221 # Get all unpreprocessed texts. 

222 dict_of_unpreprocessed_texts: Dict[str, str] = { 

223 text_id_before_preprocessing: text_value_before_preprocessing["text"] 

224 for text_id_before_preprocessing, text_value_before_preprocessing in texts.items() 

225 } 

226 

227 # Preprocess all texts (even if text is deleted). 

228 dict_of_preprocessed_texts: Dict[str, str] = preprocess( 

229 dict_of_texts=dict_of_unpreprocessed_texts, 

230 apply_stopwords_deletion=settings[str(iteration_id)]["preprocessing"]["apply_stopwords_deletion"], 

231 apply_parsing_filter=settings[str(iteration_id)]["preprocessing"]["apply_parsing_filter"], 

232 apply_lemmatization=settings[str(iteration_id)]["preprocessing"]["apply_lemmatization"], 

233 spacy_language_model=settings[str(iteration_id)]["preprocessing"]["spacy_language_model"], 

234 ) 

235 

236 # Update texts with preprocessed values. 

237 for text_id_with_preprocessing in texts.keys(): 

238 texts[text_id_with_preprocessing]["text_preprocessed"] = dict_of_preprocessed_texts[text_id_with_preprocessing] 

239 

240 # Store texts. 

241 with open(DATA_DIRECTORY / project_id / "texts.json", "w") as texts_fileobject_w: 

242 json.dump( 

243 texts, 

244 texts_fileobject_w, 

245 indent=4, 

246 ) 

247 

248 ### 

249 ### Texts vectorization. 

250 ### 

251 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

252 update_project_status( 

253 project_id=project_id, 

254 task_progression=35, 

255 task_detail="Vectorize texts.", 

256 ) 

257 

258 # Get managed preprocessed texts. 

259 dict_of_managed_preprocessed_texts: Dict[str, str] = { 

260 text_id_before_vectorization: text_value_before_vectorization["text_preprocessed"] 

261 for text_id_before_vectorization, text_value_before_vectorization in texts.items() 

262 if text_value_before_vectorization["is_deleted"] is False 

263 } 

264 

265 # Vectorize texts (only if text is not deleted). 

266 dict_of_managed_vectors: Dict[str, csr_matrix] = vectorize( 

267 dict_of_texts=dict_of_managed_preprocessed_texts, 

268 vectorizer_type=settings[str(iteration_id)]["vectorization"]["vectorizer_type"], 

269 spacy_language_model=settings[str(iteration_id)]["vectorization"]["spacy_language_model"], 

270 ) 

271 

272 # Store vectors. 

273 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "wb") as vectors_fileobject: 

274 pickle.dump( 

275 dict_of_managed_vectors, 

276 vectors_fileobject, 

277 pickle.HIGHEST_PROTOCOL, 

278 ) 

279 

280 # Convert vectors into matrix. 

281 vectors_ND: csr_matrix = vstack( 

282 dict_of_managed_vectors[text_id_with_ND] for text_id_with_ND in dict_of_managed_vectors.keys() 

283 ) 

284 

285 ### 

286 ### Texts vectorization in 2D. 

287 ### 

288 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

289 update_project_status( 

290 project_id=project_id, 

291 task_progression=50, 

292 task_detail="Reduce vectors to 2 dimensions.", 

293 ) 

294 

295 # Reduce vectors to 2 dimensions with TSNE. 

296 vectors_2D: ndarray = TSNE( 

297 n_components=2, 

298 # learning_rate="auto", # Error on "scikit-learn==0.24.1" ! 

299 init="random", 

300 random_state=settings[str(iteration_id)]["vectorization"]["random_seed"], 

301 perplexity=min(30.0, vectors_ND.shape[0] - 1), # TSNE requirement. 

302 ).fit_transform(vectors_ND) 

303 

304 # Store 2D vectors. 

305 with open(DATA_DIRECTORY / project_id / "vectors_2D.json", "w") as vectors_2D_fileobject: 

306 json.dump( 

307 { 

308 text_id_with_2D: { 

309 "x": float(vectors_2D[i_2D][0]), 

310 "y": float(vectors_2D[i_2D][1]), 

311 } 

312 for i_2D, text_id_with_2D in enumerate(dict_of_managed_vectors.keys()) 

313 }, 

314 vectors_2D_fileobject, 

315 indent=4, 

316 ) 

317 

318 ### 

319 ### Texts vectorization in 3D. 

320 ### 

321 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

322 update_project_status( 

323 project_id=project_id, 

324 task_progression=65, 

325 task_detail="Reduce vectors to 3 dimensions.", 

326 ) 

327 

328 # Reduce vectors to 3 dimensions with TSNE. 

329 vectors_3D: ndarray = TSNE( 

330 n_components=3, 

331 # learning_rate="auto", # Error on "scikit-learn==0.24.1" ! 

332 init="random", 

333 random_state=settings[str(iteration_id)]["vectorization"]["random_seed"], 

334 perplexity=min(30.0, vectors_ND.shape[0] - 1), # TSNE requirement. 

335 ).fit_transform(vectors_ND) 

336 

337 # Store 3D vectors. 

338 with open(DATA_DIRECTORY / project_id / "vectors_3D.json", "w") as vectors_3D_fileobject: 

339 json.dump( 

340 { 

341 text_id_with_3D: { 

342 "x": float(vectors_3D[i_3D][0]), 

343 "y": float(vectors_3D[i_3D][1]), 

344 "z": float(vectors_3D[i_3D][2]), 

345 } 

346 for i_3D, text_id_with_3D in enumerate(dict_of_managed_vectors.keys()) 

347 }, 

348 vectors_3D_fileobject, 

349 indent=4, 

350 ) 

351 

352 ### 

353 ### Constraints manager regeneration. 

354 ### 

355 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

356 update_project_status( 

357 project_id=project_id, 

358 task_progression=80, 

359 task_detail="(Re)generate constraints manager.", 

360 ) 

361 

362 # Initialize constraints manager with managed texts IDs. 

363 new_constraints_manager: BinaryConstraintsManager = BinaryConstraintsManager( 

364 list_of_data_IDs=list(dict_of_managed_preprocessed_texts.keys()) 

365 ) 

366 

367 # Load annotated constraints. 

368 with open(DATA_DIRECTORY / project_id / "constraints.json", "r") as constraints_fileobject_r: 

369 constraints: Dict[str, Any] = json.load(constraints_fileobject_r) 

370 

371 # First, reset `to_fix_conflict` status of all constraints. 

372 for constraint_id in constraints.keys(): 

373 constraints[constraint_id]["to_fix_conflict"] = False 

374 

375 # Then, update constraints manager with "CANNOT_LINK" constraints. 

376 for _, constraint_CL in constraints.items(): 

377 if constraint_CL["constraint_type"] == "CANNOT_LINK" and constraint_CL["is_hidden"] is False: 

378 new_constraints_manager.add_constraint( 

379 data_ID1=constraint_CL["data"]["id_1"], 

380 data_ID2=constraint_CL["data"]["id_2"], 

381 constraint_type="CANNOT_LINK", 

382 ) # No conflict can append, at this step the constraints manager handle only constraints of same type. 

383 

384 # Initialize conflicts counter. 

385 number_of_conflicts: int = 0 

386 

387 # Finaly, update constraints manager with "MUST_LINK" constraints. 

388 for constraint_ML_id, constraint_ML in constraints.items(): 

389 if constraint_ML["constraint_type"] == "MUST_LINK" and constraint_ML["is_hidden"] is False: 

390 try: 

391 new_constraints_manager.add_constraint( 

392 data_ID1=constraint_ML["data"]["id_1"], 

393 data_ID2=constraint_ML["data"]["id_2"], 

394 constraint_type="MUST_LINK", 

395 ) # Conflicts can append. 

396 except ValueError: 

397 # Update conflicts status. 

398 constraints[constraint_ML_id]["to_fix_conflict"] = True 

399 number_of_conflicts += 1 

400 

401 # Store new constraints manager. 

402 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "wb") as constraints_manager_fileobject: 

403 pickle.dump( 

404 new_constraints_manager, 

405 constraints_manager_fileobject, 

406 pickle.HIGHEST_PROTOCOL, 

407 ) 

408 

409 # Store updated constraints in file. 

410 with open(DATA_DIRECTORY / project_id / "constraints.json", "w") as constraints_fileobject_w: 

411 json.dump(constraints, constraints_fileobject_w, indent=4) 

412 

413 ### 

414 ### Store modelization inference. 

415 ### 

416 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

417 update_project_status( 

418 project_id=project_id, 

419 task_progression=95, 

420 task_detail="Store modelization inference results.", 

421 ) 

422 

423 # Load modelization inference file. 

424 with open(DATA_DIRECTORY / project_id / "modelization.json", "r") as modelization_fileobject_r: 

425 modelization: Dict[str, Any] = json.load(modelization_fileobject_r) 

426 

427 # Get constraints transitivity. 

428 constraints_transitivity: Dict[str, Dict[str, Dict[str, None]]] = ( 

429 new_constraints_manager._constraints_transitivity # noqa: WPS437 

430 ) 

431 

432 # Update modelization inference. 

433 modelization = {} 

434 for text_id_in_manager in new_constraints_manager.get_list_of_managed_data_IDs(): 

435 modelization[text_id_in_manager] = { 

436 "MUST_LINK": list(constraints_transitivity[text_id_in_manager]["MUST_LINK"].keys()), 

437 "CANNOT_LINK": list(constraints_transitivity[text_id_in_manager]["CANNOT_LINK"].keys()), 

438 } 

439 for component_id, component in enumerate(new_constraints_manager.get_connected_components()): 

440 for text_id_in_component in component: 

441 modelization[text_id_in_component]["COMPONENT"] = component_id 

442 

443 # Store updated modelization inference in file. 

444 with open(DATA_DIRECTORY / project_id / "modelization.json", "w") as modelization_fileobject_w: 

445 json.dump(modelization, modelization_fileobject_w, indent=4) 

446 

447 ### 

448 ### End of task. 

449 ### 

450 

451 # Define the next state. 

452 end_state: Optional[ICGUIStates] = None 

453 if working_state == ICGUIStates.INITIALIZATION_WITH_WORKING_MODELIZATION: 

454 end_state = ( 

455 ICGUIStates.CLUSTERING_TODO if (number_of_conflicts == 0) else ICGUIStates.INITIALIZATION_WITH_ERRORS 

456 ) 

457 elif working_state == ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_WORKING_MODELIZATION: 

458 end_state = ( 

459 ICGUIStates.SAMPLING_TODO if (number_of_conflicts == 0) else ICGUIStates.IMPORT_AT_SAMPLING_STEP_WITH_ERRORS 

460 ) 

461 elif working_state == ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_WORKING_MODELIZATION: 

462 end_state = ( 

463 ICGUIStates.CLUSTERING_TODO 

464 if (number_of_conflicts == 0) 

465 else ICGUIStates.IMPORT_AT_CLUSTERING_STEP_WITH_ERRORS 

466 ) 

467 elif working_state == ICGUIStates.IMPORT_AT_ITERATION_END_WITH_WORKING_MODELIZATION: 

468 end_state = ( 

469 ICGUIStates.ITERATION_END if (number_of_conflicts == 0) else ICGUIStates.IMPORT_AT_ITERATION_END_WITH_ERRORS 

470 ) 

471 #### elif working_state in { 

472 #### ICGUIStates.IMPORT_AT_ANNOTATION_STEP_WITH_WORKING_MODELIZATION, 

473 #### ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITHOUT_CONFLICTS, 

474 #### ICGUIStates.ANNOTATION_WITH_WORKING_MODELIZATION_WITH_CONFLICTS, 

475 #### }: 

476 else: 

477 end_state = ( 

478 ICGUIStates.ANNOTATION_WITH_UPTODATE_MODELIZATION 

479 if (number_of_conflicts == 0) 

480 else ICGUIStates.ANNOTATION_WITH_OUTDATED_MODELIZATION_WITH_CONFLICTS 

481 ) 

482 

483 # Lock status file in order to update project status. 

484 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

485 update_project_status( 

486 project_id=project_id, 

487 task_progression=None, 

488 task_detail=None, 

489 state=end_state, 

490 ) 

491 

492 

493# ============================================================================== 

494# DEFINE BACKGROUND TASKS FOR CONSTRAINTS SAMPLING 

495# ============================================================================== 

496 

497 

498### 

499### BACKGROUND TASK: Run constraints sampling task. 

500### 

501def run_constraints_sampling_task( 

502 project_id: str, 

503) -> None: 

504 """ 

505 Background task route for constraints sampling task. 

506 It performs the following actions : constraints sampling. 

507 

508 Args: 

509 project_id (str): The ID of the project. 

510 """ 

511 

512 ### 

513 ### Check parameters. 

514 ### 

515 

516 # Check project id : Case of unknown. 

517 if project_id not in get_projects(): 

518 return 

519 

520 # Lock status file in order to check project iteration and project status for this step. 

521 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

522 # Load status file. 

523 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r: 

524 project_status: Dict[str, Any] = json.load(status_fileobject_r) 

525 

526 # Check project status. 

527 if project_status["state"] != ICGUIStates.SAMPLING_PENDING: 

528 return 

529 

530 # Update status. 

531 update_project_status( 

532 project_id=project_id, 

533 task_progression=5, 

534 task_detail="Lock the project for constraints sampling step.", 

535 state=ICGUIStates.SAMPLING_WORKING, 

536 ) 

537 

538 # Get current iteration. 

539 iteration_id: int = project_status["iteration_id"] 

540 

541 ### 

542 ### Settings loading. 

543 ### 

544 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

545 update_project_status( 

546 project_id=project_id, 

547 task_progression=10, 

548 task_detail="Load settings.", 

549 ) 

550 

551 # Load settings file. 

552 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject: 

553 settings: Dict[str, Any] = json.load(settings_fileobject) 

554 

555 # Get previous iteration id. 

556 previous_iteration_id: int = iteration_id - 1 

557 

558 ### 

559 ### Constraints manager loading. 

560 ### 

561 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

562 update_project_status( 

563 project_id=project_id, 

564 task_progression=20, 

565 task_detail="Load constraints manager.", 

566 ) 

567 

568 # Load constraints manager. 

569 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "rb") as constraints_manager_fileobject: 

570 constraints_manager: BinaryConstraintsManager = pickle.load( # noqa: S301 # Usage of Pickle 

571 constraints_manager_fileobject 

572 ) 

573 

574 ### 

575 ### Clustering results loading. 

576 ### 

577 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

578 update_project_status( 

579 project_id=project_id, 

580 task_progression=30, 

581 task_detail="Load previous clustering results.", 

582 ) 

583 

584 # Get previous clustering result. 

585 with open(DATA_DIRECTORY / project_id / "clustering.json", "r") as clustering_fileobject: 

586 clustering_results_for_previous_iteration: Dict[str, int] = json.load(clustering_fileobject)[ 

587 str(previous_iteration_id) 

588 ] 

589 

590 ### 

591 ### Vectors loading. 

592 ### 

593 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

594 update_project_status( 

595 project_id=project_id, 

596 task_progression=40, 

597 task_detail="Load vectors.", 

598 ) 

599 

600 # Load vectors. 

601 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "rb") as vectors_fileobject: 

602 dict_of_managed_vectors: Dict[str, csr_matrix] = pickle.load( # noqa: S301 # Usage of Pickle 

603 vectors_fileobject 

604 ) 

605 

606 ### 

607 ### Constraints sampling initialization. 

608 ### 

609 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

610 update_project_status( 

611 project_id=project_id, 

612 task_progression=50, 

613 task_detail="Initialize constraints sampler.", 

614 ) 

615 

616 # Initialize constraints sampler. 

617 kwargs_sampling_init: Dict[str, Any] = ( 

618 settings[str(iteration_id)]["sampling"]["init_kargs"] 

619 if (settings[str(iteration_id)]["sampling"]["init_kargs"] is not None) 

620 else {} 

621 ) 

622 sampler: AbstractConstraintsSampling = ( 

623 ClustersBasedConstraintsSampling(**kwargs_sampling_init) 

624 if (settings[str(iteration_id)]["sampling"]["algorithm"] == "custom") 

625 else sampling_factory( 

626 algorithm=settings[str(iteration_id)]["sampling"]["algorithm"], 

627 random_seed=settings[str(iteration_id)]["sampling"]["random_seed"], 

628 **kwargs_sampling_init, 

629 ) 

630 ) 

631 

632 ### 

633 ### Constraints sampling. 

634 ### 

635 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

636 update_project_status( 

637 project_id=project_id, 

638 task_progression=60, 

639 task_detail="Sample {nb} pairs of texts to annotate.".format( 

640 nb=str(settings[str(iteration_id)]["sampling"]["nb_to_select"]) 

641 ), 

642 ) 

643 

644 # Sample pairs of data to annotate. 

645 sampling_result: List[Tuple[str, str]] = sampler.sample( 

646 constraints_manager=constraints_manager, 

647 nb_to_select=settings[str(iteration_id)]["sampling"]["nb_to_select"], 

648 clustering_result=clustering_results_for_previous_iteration, 

649 vectors=dict_of_managed_vectors, 

650 ) 

651 

652 # If needed: complete with some random pairs of data IDs. 

653 if len(sampling_result) < settings[str(iteration_id)]["sampling"]["nb_to_select"]: 

654 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

655 update_project_status( 

656 project_id=project_id, 

657 task_progression=75, 

658 task_detail="Need to complete with {nb} random pairs of texts.".format( 

659 nb=str(settings[str(iteration_id)]["sampling"]["nb_to_select"] - len(sampling_result)) 

660 ), 

661 ) 

662 sampling_result += [ 

663 random_sample 

664 for random_sample in sampling_factory( 

665 algorithm="random", 

666 random_seed=settings[str(iteration_id)]["sampling"]["random_seed"], 

667 ).sample( 

668 constraints_manager=constraints_manager, 

669 nb_to_select=settings[str(iteration_id)]["sampling"]["nb_to_select"] - len(sampling_result), 

670 ) 

671 if random_sample not in sampling_result 

672 ] 

673 

674 ### 

675 ### Sampling results storage. 

676 ### 

677 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

678 update_project_status( 

679 project_id=project_id, 

680 task_progression=90, 

681 task_detail="Store sampling results and prepapre annotations.", 

682 ) 

683 

684 # Load sampling results file. 

685 with open(DATA_DIRECTORY / project_id / "sampling.json", "r") as sampling_fileobject_r: 

686 sampling_results: Dict[str, List[str]] = json.load(sampling_fileobject_r) 

687 

688 # Load constraints file. 

689 with open(DATA_DIRECTORY / project_id / "constraints.json", "r") as constraints_fileobject_r: 

690 constraints: Dict[str, Dict[str, Any]] = json.load(constraints_fileobject_r) 

691 

692 # Initialize sampling result for this iteration. 

693 sampling_results[str(iteration_id)] = [] 

694 

695 # For all sampling to annotate... 

696 for data_ID1, data_ID2 in sampling_result: 

697 # Define sampling id. 

698 constraint_id: str = "({data_ID1_str},{data_ID2_str})".format( 

699 data_ID1_str=data_ID1, 

700 data_ID2_str=data_ID2, 

701 ) 

702 

703 # Add sampling id. 

704 sampling_results[str(iteration_id)].append(constraint_id) 

705 

706 # Update constraints if not already known. 

707 if constraint_id not in constraints.keys(): 

708 constraints[constraint_id] = { 

709 "data": { 

710 "id_1": data_ID1, 

711 "id_2": data_ID2, 

712 }, 

713 "constraint_type": None, 

714 "constraint_type_previous": [], 

715 "is_hidden": False, # if text is deleted. 

716 "to_annotate": False, 

717 "to_review": False, 

718 "to_fix_conflict": False, 

719 "comment": "", 

720 "date_of_update": None, 

721 "iteration_of_sampling": iteration_id, 

722 } 

723 constraints[constraint_id]["to_annotate"] = True 

724 

725 # Store sampling results. 

726 with open(DATA_DIRECTORY / project_id / "sampling.json", "w") as sampling_fileobject_w: 

727 json.dump( 

728 sampling_results, 

729 sampling_fileobject_w, 

730 indent=4, 

731 ) 

732 

733 # Store constraints results. 

734 with open(DATA_DIRECTORY / project_id / "constraints.json", "w") as constraints_fileobject_w: 

735 json.dump( 

736 constraints, 

737 constraints_fileobject_w, 

738 indent=4, 

739 ) 

740 

741 ### 

742 ### End of task. 

743 ### 

744 

745 # Lock status file in order to update project status. 

746 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

747 update_project_status( 

748 project_id=project_id, 

749 task_progression=None, 

750 task_detail=None, 

751 state=ICGUIStates.ANNOTATION_WITH_UPTODATE_MODELIZATION, 

752 ) 

753 

754 

755# ============================================================================== 

756# DEFINE BACKGROUND TASKS FOR CONSTRAINED CLUSTERING 

757# ============================================================================== 

758 

759 

760### 

761### BACKGROUND TASK: Run constraints clustering task. 

762### 

763def run_constrained_clustering_task( 

764 project_id: str, 

765) -> None: 

766 """ 

767 Background task for constraints clustering task. 

768 It performs the following actions : constrained clustering. 

769 

770 Args: 

771 project_id (str): The ID of the project. 

772 """ 

773 

774 ### 

775 ### Check parameters. 

776 ### 

777 

778 # Check project id : Case of unknown. 

779 if project_id not in get_projects(): 

780 return 

781 

782 # Lock status file in order to check project status for this step. 

783 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

784 # Load status file. 

785 with open(DATA_DIRECTORY / project_id / "status.json", "r") as status_fileobject_r: 

786 project_status: Dict[str, Any] = json.load(status_fileobject_r) 

787 

788 # Check project status. 

789 if project_status["state"] != ICGUIStates.CLUSTERING_PENDING: 

790 return 

791 

792 # Update status. 

793 update_project_status( 

794 project_id=project_id, 

795 task_progression=5, 

796 task_detail="Lock the project for constrained clustering step.", 

797 state=ICGUIStates.CLUSTERING_WORKING, 

798 ) 

799 

800 # Get current iteration. 

801 iteration_id: int = project_status["iteration_id"] 

802 

803 ### 

804 ### Settings loading. 

805 ### 

806 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

807 update_project_status( 

808 project_id=project_id, 

809 task_progression=10, 

810 task_detail="Load settings.", 

811 ) 

812 

813 # Load settings file. 

814 with open(DATA_DIRECTORY / project_id / "settings.json", "r") as settings_fileobject: 

815 settings: Dict[str, Any] = json.load(settings_fileobject) 

816 

817 ### 

818 ### Constraints manager loading. 

819 ### 

820 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

821 update_project_status( 

822 project_id=project_id, 

823 task_progression=20, 

824 task_detail="Load constraints manager.", 

825 ) 

826 

827 # Load constraints manager. 

828 with open(DATA_DIRECTORY / project_id / "constraints_manager.pkl", "rb") as constraints_manager_fileobject: 

829 constraints_manager: BinaryConstraintsManager = pickle.load( # noqa: S301 # Usage of Pickle 

830 constraints_manager_fileobject 

831 ) 

832 

833 ### 

834 ### Vectors loading. 

835 ### 

836 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

837 update_project_status( 

838 project_id=project_id, 

839 task_progression=30, 

840 task_detail="Load vectors.", 

841 ) 

842 

843 # Load vectors. 

844 with open(DATA_DIRECTORY / project_id / "vectors.pkl", "rb") as vectors_fileobject: 

845 dict_of_managed_vectors: Dict[str, csr_matrix] = pickle.load( # noqa: S301 # Usage of Pickle 

846 vectors_fileobject 

847 ) 

848 

849 ### 

850 ### Clustering model initialization. 

851 ### 

852 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

853 update_project_status( 

854 project_id=project_id, 

855 task_progression=40, 

856 task_detail="Initialize clustering model.", 

857 ) 

858 

859 # Initialize clustering model. 

860 kwargs_clustering_init: Dict[str, Any] = ( 

861 settings[str(iteration_id)]["clustering"]["init_kargs"] 

862 if (settings[str(iteration_id)]["clustering"]["init_kargs"] is not None) 

863 else {} 

864 ) 

865 clustering_model: AbstractConstrainedClustering = clustering_factory( 

866 algorithm=settings[str(iteration_id)]["clustering"]["algorithm"], 

867 random_seed=settings[str(iteration_id)]["clustering"]["random_seed"], 

868 **kwargs_clustering_init, 

869 ) 

870 

871 ### 

872 ### Constrained clustering. 

873 ### 

874 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

875 update_project_status( 

876 project_id=project_id, 

877 task_progression=50, 

878 task_detail="Run constrained clustering.", 

879 ) 

880 

881 # Run constrained clustering. 

882 clustering_result: Dict[str, int] = clustering_model.cluster( 

883 constraints_manager=constraints_manager, 

884 vectors=dict_of_managed_vectors, 

885 nb_clusters=settings[str(iteration_id)]["clustering"]["nb_clusters"], 

886 ) 

887 

888 ### 

889 ### Clustering results storage. 

890 ### 

891 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

892 update_project_status( 

893 project_id=project_id, 

894 task_progression=90, 

895 task_detail="Store clustering results.", 

896 ) 

897 

898 # Load clustering results file. 

899 with open(DATA_DIRECTORY / project_id / "clustering.json", "r") as clustering_fileobject_r: 

900 history_of_clustering_results: Dict[str, Dict[str, int]] = json.load(clustering_fileobject_r) 

901 

902 # Update clustering results. 

903 history_of_clustering_results[str(iteration_id)] = clustering_result 

904 

905 # Store clustering results. 

906 with open(DATA_DIRECTORY / project_id / "clustering.json", "w") as clustering_fileobject_w: 

907 json.dump( 

908 history_of_clustering_results, 

909 clustering_fileobject_w, 

910 indent=4, 

911 ) 

912 

913 ### 

914 ### End of task. 

915 ### 

916 

917 # Lock status file in order to update project status. 

918 with FileLock(str(DATA_DIRECTORY / project_id / "status.json.lock")): 

919 update_project_status( 

920 project_id=project_id, 

921 task_progression=None, 

922 task_detail=None, 

923 state=ICGUIStates.ITERATION_END, 

924 )