Future est une interface de Java pour représenter le résultat d’une opération asynchrone
Permet de vérifier si une tâche est terminée et de récupérer son résultat
ExecutorService
Création et Gestion des Pools de Threads
FixedThreadPool : Pool avec un nombre fixe de threads.
CachedThreadPool : Pool avec un nombre variable de threads, créant de nouveaux threads si nécessaire.
SingleThreadExecutor : Pool avec un seul thread.
importjava.util.concurrent.*;
ExecutorService executor =Executors.newSingleThreadExecutor();Future<String> future = executor.submit(()->{try{Thread.sleep(1000);// Simulation d'un traitement long}catch(InterruptedException e){thrownewRuntimeException(e);}return"Résultat du traitement";});// Vérification de l'état de la tâche// while (!future.isDone()) {if(!future.isDone()){System.out.println("Tâche en cours...");}
Tâche en cours...
try{// Récupération bloquante avec timeoutString result = future.get(2,TimeUnit.SECONDS);System.out.println("Résultat obtenu: "+ result);}catch(InterruptedException|ExecutionException|TimeoutException e){System.err.println("Erreur pendant l'exécution: "+ e.getMessage());}// Arrêt de l'executorexecutor.shutdown();
Résultat obtenu: Résultat du traitement
CompletableFuture en Java (Promises)
CompletableFuture est une classe de Java 8 pour la programmation asynchrone
Permet de chaîner des opérations asynchrones
Supporte la gestion des erreurs et des
Méthodes Principales :
thenApply : Transformation du résultat.
thenCompose : Chaînage de futures.
thenCombine : Combinaison de deux futures.
thenAccept : Consommation du résultat.
thenRun : Exécution d’une tâche après la complétion.
exceptionally : Gestion des erreurs.
get : Récupération bloquante du résultat.
get(timeout, unit) : Récupération avec timeout.
// Création d'un CompletableFuture avec simulation de traitement longCompletableFuture<String> future = CompletableFuture.supplyAsync(()->{try{System.out.println("Début du traitement...");TimeUnit.SECONDS.sleep(5);return"Résultat du traitement";}catch(InterruptedException e){thrownewRuntimeException("Erreur pendant le traitement", e);}});
Début du traitement...
// 1. Traitement non bloquant avec gestion d'erreurfuture.thenAccept(r ->System.out.println("Traitement asynchrone du résultat: "+ r)).exceptionally(error ->{System.err.println("Erreur: "+ error.getMessage());returnnull;});
// 2. Chaînage de transformations non bloquantesCompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{try{TimeUnit.SECONDS.sleep(2);return"Résultat initial dans le thread: "+Thread.currentThread().getId();}catch(InterruptedException e){thrownewRuntimeException("Erreur pendant le traitement", e);}}).thenApply(r ->"Future1: "+ r.toUpperCase());// Création d'un autre CompletableFuture pour démontrer thenCombineCompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->" - Additional Data dans un autre thread: "+Thread.currentThread().getId());// Utilisation de thenCombine pour combiner les résultats des deux futuresCompletableFuture<String> combinedFuture = future1.thenCombine(future2,(result1, result2)-> result1 + result2);// Affichage du résultat combinécombinedFuture.thenAccept(combinedResult ->System.out.println("Résultat combiné dans le thread: "+Thread.currentThread().getId()+" "+combinedResult));
System.out.println("En attente du résultat...");try{// 3. Attente bloquante avec timeoutString result = combinedFuture.get(5,TimeUnit.SECONDS);System.out.println("Résultat obtenu: "+ result);}catch(InterruptedException|ExecutionException e){System.err.println("Erreur pendant l'exécution: "+ e.getMessage());}catch(TimeoutException e){System.err.println("Timeout dépassé: "+ e.getMessage());}
En attente du résultat...
Résultat combiné dans le thread: 41 Future1: RÉSULTAT INITIAL DANS LE THREAD: 41 - Additional Data dans un autre thread: 42
Résultat obtenu: Future1: RÉSULTAT INITIAL DANS LE THREAD: 41 - Additional Data dans un autre thread: 42
ForkJoinPool
Implémentation avancée de ExecutorService pour les tâches de type “divide-and-conquer”.
Améliorer l’efficacité des tâches parallèles en utilisant le “work-stealing”.
Fonctionnement
Work-Stealing : Technique où les threads inactifs volent des tâches des threads occupés pour équilibrer la charge de travail.
Divide-and-Conquer : Diviser une tâche complexe en sous-tâches plus petites, les résoudre en parallèle, puis combiner les résultats.
Nombre de Threads Créés
Nombre de Threads : Le nombre de threads dans un ForkJoinPool est déterminé par le nombre de processeurs disponibles.
Par défaut : ForkJoinPool utilise Runtime.getRuntime().availableProcessors() pour déterminer le nombre de threads.
Personnalisation : Vous pouvez spécifier le nombre de threads lors de la création d’un ForkJoinPool.
Classes Principales
RecursiveTask : Classe abstraite pour les tâches qui retournent un résultat.
RecursiveAction : Classe abstraite pour les tâches qui ne retournent pas de résultat.
int[] array =newint[100];Random random =newRandom();for(int i =0; i < array.length; i++){ array[i]= random.nextInt(100);// Entiers aléatoires entre 0 et 99}ForkJoinPool forkJoinPool =newForkJoinPool();ForkJoinExample task =newForkJoinExample(array,0, array.length);int result = forkJoinPool.invoke(task);System.out.println("Result: "+ result);
Result: 4674
Virtual Threads (Java 21)
Threads légers gérés par la JVM (et non par le système d’exploitation)
Coût mémoire très faible (~1KB vs ~1MB pour un thread platform)
importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;importjava.util.concurrent.atomic.AtomicInteger;// Création avec ExecutorServiceExecutorService executor =Executors.newVirtualThreadPerTaskExecutor();AtomicInteger activeThreads =newAtomicInteger(0);try{finalint NB_THREADS =5;// Création de NB_THREADS threads virtuelsList<Future<Integer>> futures = IntStream.range(0, NB_THREADS).mapToObj(i ->{return executor.submit(()->{// Incrémenter le compteur de threads actifsint current = activeThreads.incrementAndGet();System.out.printf("Thread %d démarré. Threads actifs: %d%n", i, current);// Simulation de travailThread.sleep(Duration.ofMillis(100+newRandom().nextInt(900)));// Décrémenter le compteur current = activeThreads.decrementAndGet();System.out.printf("Thread %d terminé. Threads actifs: %d%n", i, current);return i;});}).collect(Collectors.toList());// Attendre la fin de tous les threadsfor(Future<Integer> future : futures){ future.get();}}finally{ executor.shutdown();}System.out.println("Tous les threads sont terminés");
Gestion manuelle des callbacks et des threads : La gestion des callbacks et des threads peut rapidement devenir complexe et difficile à maintenir.
Callback Hell : L’imbrication excessive de callbacks peut rendre le code illisible et difficile à déboguer.
Synchronisation : La synchronisation des threads pour éviter les conditions de course et les deadlocks ajoute une couche de complexité supplémentaire.
Difficulté de Composition
Combinaison d’opérations asynchrones : Il est souvent difficile de combiner et d’enchaîner des opérations asynchrones de manière fluide.
Coordination des tâches : La coordination de plusieurs tâches asynchrones peut être complexe, surtout lorsqu’elles dépendent les unes des autres.
Propagation des résultats : La propagation des résultats entre différentes étapes asynchrones peut être difficile à gérer.
Gestion des Erreurs
Détection des erreurs : La détection des erreurs dans un contexte asynchrone est plus complexe que dans un contexte synchrone.
Propagation des exceptions : La propagation des exceptions à travers les différentes couches de callbacks ou de threads peut être difficile à implémenter correctement.
Gestion centralisée : Il est difficile de centraliser la gestion des erreurs pour les opérations asynchrones, ce qui peut entraîner une duplication du code de gestion des erreurs.
Solutions et Bonnes Pratiques
Utilisation de Frameworks et Bibliothèques
CompletableFuture : Simplifie la gestion des opérations asynchrones et permet de chaîner les opérations de manière fluide.
Project Reactor : Fournit des abstractions pour la programmation réactive, facilitant la composition et la gestion des flux de données asynchrones.
RxJava : Offre des outils pour la programmation réactive, permettant de gérer les opérations asynchrones de manière déclarative.
Gestion des Callbacks
Utilisation de Lambdas : Simplifie la syntaxe des callbacks et améliore la lisibilité du code.
Éviter le Callback Hell : Utiliser des techniques comme les Promises ou les Futures pour éviter l’imbrication excessive des callbacks.
Gestion des Erreurs
Centralisation de la Gestion des Erreurs : Utiliser des mécanismes centralisés pour gérer les erreurs, comme les handlers globaux ou les mécanismes de propagation des exceptions.
Utilisation de Méthodes de Gestion des Erreurs : Utiliser des méthodes comme exceptionally ou handle avec CompletableFuture pour gérer les erreurs de manière fluide.
Composition des Opérations Asynchrones
Chaînage des Opérations : Utiliser des méthodes comme thenApply, thenCompose, et thenCombine pour chaîner les opérations asynchrones.
Coordination des Tâches : Utiliser des outils comme les CompletableFuture ou les flux réactifs pour coordonner les tâches asynchrones de manière efficace.