Ce notebook présente brièvement la programmation en pipeline (Stream) à partir de Java 8.
La suite sera illustrée sur un exemple simple composé d’une liste de personnes.
recordPerson(String email,String prenom,String nom,int age){}//Immutable List of RecordsList<Person> personnes =List.of(newPerson("pierre.durand@a.fr","Pierre","Durand",20),newPerson("marie.durand@b.fr","Marie","Durand",14),newPerson("albert.martin@c.fr","Albert","Martin",30)// ...);personnes;
Les Streams en Java : une approche fonctionnelle pour traiter les données
Java 8 a introduit les streams, une abstraction puissante pour traiter des collections de données de manière déclarative et fonctionnelle. Contrairement aux collections traditionnelles, un stream ne stocke pas les données mais fournit une vue séquentielle sur une source de données.
Caractéristiques clés des streams :
Pipeline de traitement: Les opérations sur un stream sont enchaînées pour former un pipeline. Chaque étape du pipeline transforme le stream en un nouveau stream.
Exécution paresseuse: Les opérations ne sont exécutées que lorsque le résultat final est consommé (par exemple, en collectant les éléments dans une liste).
Parallélisme: Les streams peuvent être parallélisés pour tirer parti de plusieurs cœurs de processeur.
Sources variées: Les streams peuvent être créés à partir de collections, de tableaux, de fichiers, et même générés à la volée.
List<String> words =Arrays.asList("Java","est","un","langage","puissant");List<String> uppercaseWords = words.stream().map(String::toUpperCase).collect(Collectors.toList());uppercaseWords.forEach(logger::info);
15:18:51.124 [IJava-executor-5] INFO notebook -- JAVA
15:18:51.126 [IJava-executor-5] INFO notebook -- EST
15:18:51.126 [IJava-executor-5] INFO notebook -- UN
15:18:51.126 [IJava-executor-5] INFO notebook -- LANGAGE
15:18:51.126 [IJava-executor-5] INFO notebook -- PUISSANT
Dans cet exemple :
words.stream() crée un stream à partir de la liste words.
map(String::toUpperCase) transforme chaque élément en majuscules.
collect(Collectors.toList()) collecte les éléments du stream dans une nouvelle liste.
La liste est ensuite traitée en flux avec une lambda expression
Les opérations principales sur les streams :
filter: Filtre les éléments selon un prédicat.
map: Applique une fonction à chaque élément.
reduce: Réduit les éléments à une valeur unique.
sorted: Trie les éléments.
collect: Collecte les éléments dans une structure de données.
Pourquoi utiliser les streams ?
Code plus concis et expressif: Les opérations sur les streams sont souvent plus courtes et plus faciles à comprendre que les boucles traditionnelles.
Parallélisme facile à mettre en œuvre: Le parallélisme peut être ajouté simplement en appelant la méthode parallel().
Intégration avec les fonctionnalités fonctionnelles de Java 8: Les streams s’intègrent bien avec les lambda expressions et les références de méthode.
Création et Utilisation de Base des Streams
Les streams en Java offrent une manière élégante et expressive de traiter des collections de données. Ils permettent de réaliser des opérations telles que le filtrage, la transformation et la réduction de données de manière déclarative. La création d’un stream peut se faire à partir de diverses sources : collections, tableaux, fichiers, ou encore à l’aide de méthodes de fabrique. Par exemple, pour créer un stream à partir d’une liste, il suffit d’appeler la méthode stream() sur cette liste. Les streams sont particulièrement utiles pour effectuer des traitements en chaîne, en combinant plusieurs opérations les unes à la suite des autres.
Une fois un stream créé, il est possible d’appliquer différentes opérations sur ses éléments. L’une des opérations les plus simples consiste à parcourir tous les éléments du stream et à effectuer une action sur chacun d’eux. C’est le rôle de la méthode forEach. Par exemple, pour afficher en majuscules tous les mots d’une liste, on peut utiliser forEach en combinaison avec la méthode map pour transformer chaque mot en majuscules. Les streams offrent également de nombreuses autres opérations puissantes, telles que filter pour filtrer les éléments, map pour transformer les éléments, reduce pour réduire un stream à une seule valeur, et bien d’autres encore.
15:18:52.258 [IJava-executor-5] INFO notebook -- DURAND, Pierre
15:18:52.259 [IJava-executor-5] INFO notebook -- DURAND, Marie
15:18:52.259 [IJava-executor-5] INFO notebook -- MARTIN, Albert
Les Streams : générateurs 1/4
Cet exemple de code Java utilise un stream d’entiers pour afficher les nombres de 10 à 14 sur la console.
The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.
Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.
The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.
Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.
Filtrage
Ce code utilise un prédicat pour filtrer les personnes majeures d’une liste, puis les affiche à l’aide d’un stream.
Il est aussi possible d’appliquer une Function à chaque élément du flux avec map().
Par exemple pour transformer un Stream<Person> en Stream<String> pour afficher les noms formattés des personnes de plus de 18 ans.
// 1. Créer un stream à partir de la liste "personnes"personnes.stream()// 2. Filtrer les personnes majeures (âge supérieur à 18 ans).filter(p -> p.age()>18)// 3. Transformer chaque personne en une chaîne de caractères formatée.map(p ->String.format("%s%s", p.nom().toUpperCase(), p.prenom().toLowerCase()))// 4. Afficher chaque chaîne de caractères formatée sur la console.forEach(System.out::println);
DURAND pierre
MARTIN albert
Il est aussi possible de passer d’un flux d’objet à un flux de primitif.
Un ensemble d’opération aggrégations (disposant d’un systeme d’itération interne) est proposé (average, sum, min, max, count, …)
Lorsqu’une opération sur un élément d’un stream peut lever une exception, la gestion de cette exception au sein d’une expression lambda peut complexifier le code. En effet, les expressions lambda sont conçues pour être concises et fonctionnelles, et l’ajout de blocs try-catch peut réduire leur lisibilité. De plus, les exceptions vérifiées (celles qui doivent être déclarées dans la signature d’une méthode) ne peuvent pas être directement levées depuis une expression lambda.
Plusieurs stratégies peuvent être envisagées pour gérer ces situations :
Envelopper l’exception dans une RuntimeException : Cette approche permet d’éviter de déclarer l’exception dans la signature de la lambda, mais elle peut masquer les erreurs et rendre le débogage plus difficile.
Utiliser des méthodes de référence : Si l’opération qui lève l’exception est déjà implémentée dans une méthode, une référence de méthode peut être utilisée pour l’invoquer. Cela évite de réécrire le code de gestion des exceptions.
Créer des interfaces fonctionnelles personnalisées : Pour des besoins spécifiques, il est possible de définir des interfaces fonctionnelles qui déclarent des exceptions vérifiées. Cela permet de gérer les exceptions de manière plus explicite.
Utiliser des bibliothèques tierces : Certaines bibliothèques proposent des fonctionnalités pour faciliter la gestion des exceptions dans les streams, comme la possibilité de définir des stratégies de récupération ou de propagation des exceptions.
//Production de 10 entiers à raison d'un par 1/10s et calcul de la somme.IntStream.range(0,10).map(i ->{try{Thread.sleep(100);}catch(java.lang.InterruptedException e){logger.info("{}",e.getMessage());}return i*2;}).sum();
90
IntFunction<Integer> processElement = i ->{try{Thread.sleep(100);return i *2;}catch(InterruptedException e){// Log the exception with details logger.info("Error during calculation {}",e.getMessage());returnnull;// Indicate failure by returning null}};IntStream.range(0,10).mapToObj(processElement).mapToInt(Integer::intValue)// Convert results to int primitives.sum();
90
Une solution élégante est d’écrire une classe wrapper qui gère l’exception par exemple en l’encapsulant dans une runtime exception cf. https://www.baeldung.com/java-lambda-exceptions.
Agrégation des Streams
Pour agréger les éléments d’un stream, Java 8 introduit les méthodes reduce() et collect(). Bien qu’elles offrent une grande flexibilité, leur utilisation peut s’avérer complexe pour les débutants. C’est pourquoi, dans la plupart des cas, les méthodes utilitaires de la classe Collectors sont préférables. Ces méthodes fournissent des solutions prêtes à l’emploi pour des opérations d’agrégation courantes comme le calcul de sommes, de moyennes, le regroupement d’éléments, etc.
//Regrouper les prénoms par nom dans une Map.Map<String,List<String>> PersonfirstnamesGroupedByNames = personnes.stream().collect( Collectors.groupingBy( Person::nom, Collectors.mapping( Person::prenom, Collectors.toList())));PersonfirstnamesGroupedByNames;
Accélération des traitements sur de grands ensembles de données
Meilleure utilisation des ressources matérielles
Inconvénients:
Coût de gestion des threads
Complexité accrue du code
Quand utiliser ?
Lorsque les opérations sur les éléments sont indépendantes
Pour des jeux de données volumineux
Lorsque les gains de performance sont significatifs par rapport au coût
Le parallélisme des streams permet d’accélérer les traitements en répartissant les calculs sur plusieurs threads. Cependant, cette approche n’est pas sans conséquences. La création et la gestion de threads engendrent un coût en termes de ressources système. De plus, la complexité du code peut augmenter, notamment lorsqu’il s’agit de synchroniser les différentes tâches parallèles. Il est donc essentiel d’évaluer soigneusement les avantages et les inconvénients du parallélisme avant de l’utiliser.
Pour tirer pleinement parti du parallélisme, il est recommandé d’appliquer cette technique à des opérations indépendantes sur des éléments d’un stream. Les traitements longs et les grands volumes de données sont des cas d’utilisation typiques. Il est également important de mesurer les gains de performance réels obtenus avec le parallélisme, afin de s’assurer que les coûts associés sont justifiés. Dans l’exemple ci-dessus, la méthode parallelStream() est utilisée pour créer un stream parallèle. La méthode map() applique une fonction à chaque élément du stream, simulant un traitement long avec la méthode sleep(). Enfin, la méthode sum() réduit le stream à une valeur unique en calculant la somme des éléments.
Dans les exemples ci-dessous une pause artificielle est ajoutée pour favoriser le parallelisme. cf. https://www.baeldung.com/java-when-to-use-parallel-stream
importjava.util.function.BiFunction;BiFunction<Random,Integer, Stream<Person>> personGenerator=(random, size)->IntStream.range(0,/* limit */ size)//TRICK TO KNOW THE SIZE OF THE STREAM.mapToObj(unused ->{try{Thread.sleep(10);}catch(java.lang.InterruptedException e){};returnnewPerson("pierre.durand@a.fr","Pierre","Durand", random.nextInt(100));});long seed =System.currentTimeMillis();// Get current time in millisecondsRandom random =newRandom(seed);double ageMoyen;long startTime =System.nanoTime();ageMoyen = personGenerator.apply(random,200).mapToInt(Person::age).average().getAsDouble();//Il s'agit d'un Optionnallong endTime =System.nanoTime();System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen,(endTime - startTime)/1000000));
moyenne calculée 48 en 2333ms
Random random =newRandom(seed);double ageMoyen;long startTime =System.nanoTime();ageMoyen = personGenerator.apply(random,200).parallel().mapToInt(Person::age).average().getAsDouble();//Il s'agit d'un Optionnallong endTime =System.nanoTime();System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen,(endTime - startTime)/1000000));
moyenne calculée 48 en 330ms
Les streams parallèles en Java exploitent le modèle de concurrence ForkJoin pour paralléliser les traitements sur les collections. Le framework ForkJoin divise récursivement les tâches en sous-tâches jusqu’à un niveau de granularité optimal, puis les exécute en parallèle. Cette approche permet de tirer pleinement parti des ressources multi-cœurs des machines modernes.
ForkJoinPool pool =newForkJoinPool();ystem.out.println("Parallelism = "+pool.getParallelism());
Parallelism = 8
publicstaticclass AgeSumTask extends RecursiveTask<Long>{privatefinalList<Person> persons;privatefinalint start;privatefinalint end;publicAgeSumTask(List<Person> persons,int start,int end){this.persons= persons;this.start= start;this.end= end;}protectedLongcompute(){if(end - start <=10_000){// Seuil pour le découpagelong sum =0;for(int i = start; i < end; i++){ sum += persons.get(i).age();}return sum;}else{int middle =(start + end)/2; AgeSumTask left =newAgeSumTask(persons, start, middle); AgeSumTask right =newAgeSumTask(persons, middle, end); left.fork();long rightResult = right.compute();long leftResult = left.join();return leftResult + rightResult;}}}publicstaticdoublecalculateAverageAge(List<Person> persons){ ForkJoinPool pool =newForkJoinPool(); AgeSumTask task =newAgeSumTask(persons,0, persons.size());long totalAge = pool.invoke(task);return(double) totalAge / persons.size();}double averageAge =calculateAverageAge( personnes);System.out.println("La moyenne d'âge est de %1.0f an(s).".formatted(averageAge));