Stream Java

Java
I111
PO43
Functional
Lecture
Programmation en pipeline (Stream) à partir de Java 8
Auteur
Affiliations

Université de Toulon

LIS UMR CNRS 7020

Date de publication

2024-11-28

Ce notebook présente brièvement la programmation en pipeline (Stream) à partir de Java 8.

La suite sera illustrée sur un exemple simple composé d’une liste de personnes.

record Person(String email, String prenom, String nom, int age) {}

//Immutable List of Records
List<Person> personnes = List.of(
     new Person("pierre.durand@a.fr","Pierre", "Durand", 20), 
     new Person("marie.durand@b.fr","Marie", "Durand", 14),
     new Person("albert.martin@c.fr","Albert", "Martin", 30)
    // ...
);
personnes;
[Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20], Person[email=marie.durand@b.fr, prenom=Marie, nom=Durand, age=14], Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]]

Les Streams en Java : une approche fonctionnelle pour traiter les données

Java 8 a introduit les streams, une abstraction puissante pour traiter des collections de données de manière déclarative et fonctionnelle. Contrairement aux collections traditionnelles, un stream ne stocke pas les données mais fournit une vue séquentielle sur une source de données.

Caractéristiques clés des streams :

  • Pipeline de traitement: Les opérations sur un stream sont enchaînées pour former un pipeline. Chaque étape du pipeline transforme le stream en un nouveau stream.
  • Exécution paresseuse: Les opérations ne sont exécutées que lorsque le résultat final est consommé (par exemple, en collectant les éléments dans une liste).
  • Parallélisme: Les streams peuvent être parallélisés pour tirer parti de plusieurs cœurs de processeur.
  • Sources variées: Les streams peuvent être créés à partir de collections, de tableaux, de fichiers, et même générés à la volée.

List<String> words = Arrays.asList("Java", "est", "un", "langage", "puissant");
List<String> uppercaseWords = words.stream()
                                    .map(String::toUpperCase)
                                    .collect(Collectors.toList());
uppercaseWords.forEach(logger::info);
15:18:51.124 [IJava-executor-5] INFO  notebook -- JAVA
15:18:51.126 [IJava-executor-5] INFO  notebook -- EST
15:18:51.126 [IJava-executor-5] INFO  notebook -- UN
15:18:51.126 [IJava-executor-5] INFO  notebook -- LANGAGE
15:18:51.126 [IJava-executor-5] INFO  notebook -- PUISSANT

Dans cet exemple :

  • words.stream() crée un stream à partir de la liste words.
  • map(String::toUpperCase) transforme chaque élément en majuscules.
  • collect(Collectors.toList()) collecte les éléments du stream dans une nouvelle liste.
  • La liste est ensuite traitée en flux avec une lambda expression

Les opérations principales sur les streams :

  • filter: Filtre les éléments selon un prédicat.
  • map: Applique une fonction à chaque élément.
  • reduce: Réduit les éléments à une valeur unique.
  • sorted: Trie les éléments.
  • collect: Collecte les éléments dans une structure de données.

Pourquoi utiliser les streams ?

  • Code plus concis et expressif: Les opérations sur les streams sont souvent plus courtes et plus faciles à comprendre que les boucles traditionnelles.
  • Parallélisme facile à mettre en œuvre: Le parallélisme peut être ajouté simplement en appelant la méthode parallel().
  • Intégration avec les fonctionnalités fonctionnelles de Java 8: Les streams s’intègrent bien avec les lambda expressions et les références de méthode.

Création et Utilisation de Base des Streams

Les streams en Java offrent une manière élégante et expressive de traiter des collections de données. Ils permettent de réaliser des opérations telles que le filtrage, la transformation et la réduction de données de manière déclarative. La création d’un stream peut se faire à partir de diverses sources : collections, tableaux, fichiers, ou encore à l’aide de méthodes de fabrique. Par exemple, pour créer un stream à partir d’une liste, il suffit d’appeler la méthode stream() sur cette liste. Les streams sont particulièrement utiles pour effectuer des traitements en chaîne, en combinant plusieurs opérations les unes à la suite des autres.

Une fois un stream créé, il est possible d’appliquer différentes opérations sur ses éléments. L’une des opérations les plus simples consiste à parcourir tous les éléments du stream et à effectuer une action sur chacun d’eux. C’est le rôle de la méthode forEach. Par exemple, pour afficher en majuscules tous les mots d’une liste, on peut utiliser forEach en combinaison avec la méthode map pour transformer chaque mot en majuscules. Les streams offrent également de nombreuses autres opérations puissantes, telles que filter pour filtrer les éléments, map pour transformer les éléments, reduce pour réduire un stream à une seule valeur, et bien d’autres encore.

personnes.stream().forEach(p->logger.info("{}, {}",p.nom().toUpperCase(),p.prenom()));
15:18:52.258 [IJava-executor-5] INFO  notebook -- DURAND, Pierre
15:18:52.259 [IJava-executor-5] INFO  notebook -- DURAND, Marie
15:18:52.259 [IJava-executor-5] INFO  notebook -- MARTIN, Albert

Les Streams : générateurs 1/4

Cet exemple de code Java utilise un stream d’entiers pour afficher les nombres de 10 à 14 sur la console.

import java.util.stream.IntStream;
IntStream.range(10,15).forEach(System.out::println);
10
11
12
13
14

Les Streams : générateurs 2/4

Cette ligne de code parcourt chaque élément de la liste personnes et affiche chacun d’eux sur la console.

personnes.stream().forEach(System.out::println);
Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20]
Person[email=marie.durand@b.fr, prenom=Marie, nom=Durand, age=14]
Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]

Les Streams : générateurs 3/4

Ce code lit chaque ligne du fichier /etc/legal et l’affiche sur la console.

import java.util.stream.Stream;
try (Stream stream = Files.lines(Paths.get("/etc/legal"))) {
            stream.forEach(System.out::println);
}

The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

Les Streams : générateurs 4/4

ou les flux d’entrée/sortie (I/O stream)

try (FileReader fileReader = new FileReader("/etc/legal");
  BufferedReader br = new BufferedReader(fileReader)) {
  br.lines().forEach(System.out::println);
}

The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.


Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

Filtrage

Ce code utilise un prédicat pour filtrer les personnes majeures d’une liste, puis les affiche à l’aide d’un stream.

personnes.stream().filter(p -> p.age() > 18).forEach(System.out::println);
Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20]
Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]

Map

Il est aussi possible d’appliquer une Function à chaque élément du flux avec map().

Par exemple pour transformer un Stream<Person> en Stream<String> pour afficher les noms formattés des personnes de plus de 18 ans.

// 1. Créer un stream à partir de la liste "personnes"
personnes.stream()

// 2. Filtrer les personnes majeures (âge supérieur à 18 ans)
.filter(p -> p.age() > 18)

// 3. Transformer chaque personne en une chaîne de caractères formatée
.map(p -> String.format("%s %s", p.nom().toUpperCase(), p.prenom().toLowerCase())) 

// 4. Afficher chaque chaîne de caractères formatée sur la console
.forEach(System.out::println);
DURAND pierre
MARTIN albert

Il est aussi possible de passer d’un flux d’objet à un flux de primitif.

Un ensemble d’opération aggrégations (disposant d’un systeme d’itération interne) est proposé (average, sum, min, max, count, …)

double ageMoyen = personnes.stream()
    .mapToInt(Person::age)
    .average()
    .getAsDouble();
ageMoyen;
21.333333333333332

Exceptions dans les Interfaces Fonctionnelles

Lorsqu’une opération sur un élément d’un stream peut lever une exception, la gestion de cette exception au sein d’une expression lambda peut complexifier le code. En effet, les expressions lambda sont conçues pour être concises et fonctionnelles, et l’ajout de blocs try-catch peut réduire leur lisibilité. De plus, les exceptions vérifiées (celles qui doivent être déclarées dans la signature d’une méthode) ne peuvent pas être directement levées depuis une expression lambda.

Plusieurs stratégies peuvent être envisagées pour gérer ces situations :

  • Envelopper l’exception dans une RuntimeException : Cette approche permet d’éviter de déclarer l’exception dans la signature de la lambda, mais elle peut masquer les erreurs et rendre le débogage plus difficile.
  • Utiliser des méthodes de référence : Si l’opération qui lève l’exception est déjà implémentée dans une méthode, une référence de méthode peut être utilisée pour l’invoquer. Cela évite de réécrire le code de gestion des exceptions.
  • Créer des interfaces fonctionnelles personnalisées : Pour des besoins spécifiques, il est possible de définir des interfaces fonctionnelles qui déclarent des exceptions vérifiées. Cela permet de gérer les exceptions de manière plus explicite.
  • Utiliser des bibliothèques tierces : Certaines bibliothèques proposent des fonctionnalités pour faciliter la gestion des exceptions dans les streams, comme la possibilité de définir des stratégies de récupération ou de propagation des exceptions.
//Production de 10 entiers à raison d'un par 1/10s et calcul de la somme.
IntStream.range(0, 10)
        .map(i -> {try {
                        Thread.sleep(100);
                        } 
                   catch (java.lang.InterruptedException e) {logger.info("{}",e.getMessage());} 
                   return i*2;})
        .sum();
90

IntFunction<Integer> processElement = i -> {
            try {
                Thread.sleep(100);
                return i * 2;
            } catch (InterruptedException e) {
                // Log the exception with details
                logger.info("Error during calculation {}",e.getMessage());              
                return null; // Indicate failure by returning null
            }
        };

IntStream.range(0, 10)
                .mapToObj(processElement)
                .mapToInt(Integer::intValue) // Convert results to int primitives
                .sum();
90

Une solution élégante est d’écrire une classe wrapper qui gère l’exception par exemple en l’encapsulant dans une runtime exception cf. https://www.baeldung.com/java-lambda-exceptions.

Agrégation des Streams

Pour agréger les éléments d’un stream, Java 8 introduit les méthodes reduce() et collect(). Bien qu’elles offrent une grande flexibilité, leur utilisation peut s’avérer complexe pour les débutants. C’est pourquoi, dans la plupart des cas, les méthodes utilitaires de la classe Collectors sont préférables. Ces méthodes fournissent des solutions prêtes à l’emploi pour des opérations d’agrégation courantes comme le calcul de sommes, de moyennes, le regroupement d’éléments, etc.

Production d’une collection à partir d’un Stream

import java.util.stream.Collectors;

List<String> AdultNames = personnes.stream()
    .filter(p -> p.age() > 18)
    .map(p-> "%s %s".formatted(p.nom().toUpperCase(), p.prenom().toLowerCase()))
    .collect(Collectors.toList());

AdultNames;
[DURAND pierre, MARTIN albert]

Opérations de groupement

//Regrouper les prénoms par nom dans une Map.
Map<String, List<String>> PersonfirstnamesGroupedByNames = personnes.stream()
        .collect(
            Collectors.groupingBy(
                Person::nom,                      
                Collectors.mapping(
                    Person::prenom,
                    Collectors.toList())));
PersonfirstnamesGroupedByNames;
{Durand=[Pierre, Marie], Martin=[Albert]}

Concaténation en String

personnes.stream()
    .filter(p -> p.age() > 18)
    .map(p-> "%s %s".formatted(p.nom().toUpperCase(), p.prenom().toLowerCase()))
    .collect(Collectors.joining(",","-->","<--"));
-->DURAND pierre,MARTIN albert<--

Parallélisme des Streams

  • Avantages:
    • Accélération des traitements sur de grands ensembles de données
    • Meilleure utilisation des ressources matérielles
  • Inconvénients:
    • Coût de gestion des threads
    • Complexité accrue du code
  • Quand utiliser ?
    • Lorsque les opérations sur les éléments sont indépendantes
    • Pour des jeux de données volumineux
    • Lorsque les gains de performance sont significatifs par rapport au coût

Le parallélisme des streams permet d’accélérer les traitements en répartissant les calculs sur plusieurs threads. Cependant, cette approche n’est pas sans conséquences. La création et la gestion de threads engendrent un coût en termes de ressources système. De plus, la complexité du code peut augmenter, notamment lorsqu’il s’agit de synchroniser les différentes tâches parallèles. Il est donc essentiel d’évaluer soigneusement les avantages et les inconvénients du parallélisme avant de l’utiliser.

Pour tirer pleinement parti du parallélisme, il est recommandé d’appliquer cette technique à des opérations indépendantes sur des éléments d’un stream. Les traitements longs et les grands volumes de données sont des cas d’utilisation typiques. Il est également important de mesurer les gains de performance réels obtenus avec le parallélisme, afin de s’assurer que les coûts associés sont justifiés. Dans l’exemple ci-dessus, la méthode parallelStream() est utilisée pour créer un stream parallèle. La méthode map() applique une fonction à chaque élément du stream, simulant un traitement long avec la méthode sleep(). Enfin, la méthode sum() réduit le stream à une valeur unique en calculant la somme des éléments.

Dans les exemples ci-dessous une pause artificielle est ajoutée pour favoriser le parallelisme. cf. https://www.baeldung.com/java-when-to-use-parallel-stream


import java.util.function.BiFunction;

BiFunction<Random, Integer, Stream<Person>> personGenerator=(random, size)->IntStream.range(0, /* limit */ size) //TRICK TO KNOW THE SIZE OF THE STREAM
        .mapToObj(unused -> {try {Thread.sleep(10);} catch (java.lang.InterruptedException e) {};
                             return new Person("pierre.durand@a.fr","Pierre", "Durand", random.nextInt(100));});

long seed = System.currentTimeMillis(); // Get current time in milliseconds
Random random = new Random(seed);

double ageMoyen;
long startTime = System.nanoTime();
ageMoyen = personGenerator.apply(random, 200) 
        .mapToInt(Person::age)
        .average()
        .getAsDouble(); //Il s'agit d'un Optionnal
long endTime = System.nanoTime();
System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen, (endTime - startTime)/1000000));
moyenne calculée 48 en 2333ms

Random random = new Random(seed);

double ageMoyen;
long startTime = System.nanoTime();
ageMoyen =  personGenerator.apply(random, 200)
        .parallel()
        .mapToInt(Person::age)
        .average()
        .getAsDouble(); //Il s'agit d'un Optionnal
long endTime = System.nanoTime();
System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen, (endTime - startTime)/1000000));
moyenne calculée 48 en 330ms

Les streams parallèles en Java exploitent le modèle de concurrence ForkJoin pour paralléliser les traitements sur les collections. Le framework ForkJoin divise récursivement les tâches en sous-tâches jusqu’à un niveau de granularité optimal, puis les exécute en parallèle. Cette approche permet de tirer pleinement parti des ressources multi-cœurs des machines modernes.

ForkJoinPool pool = new ForkJoinPool();
ystem.out.println("Parallelism = " +pool.getParallelism()); 
Parallelism = 8
public static class AgeSumTask extends RecursiveTask<Long> {
    private final List<Person> persons;
    private final int start;
    private final int end;

    public AgeSumTask(List<Person> persons, int start, int end) {
        this.persons = persons;
        this.start = start;
        this.end = end;
    }

    protected Long compute() {
        if (end - start <= 10_000) { // Seuil pour le découpage
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += persons.get(i).age();
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            AgeSumTask left = new AgeSumTask(persons, start, middle);
            AgeSumTask right = new AgeSumTask(persons, middle, end);
            left.fork();
            long rightResult = right.compute();
            long leftResult = left.join();
            return leftResult + rightResult;
        }
    }

    }

    public static double calculateAverageAge(List<Person> persons) {
        ForkJoinPool pool = new ForkJoinPool();
        AgeSumTask task = new AgeSumTask(persons, 0, persons.size());
        long totalAge = pool.invoke(task);
        return (double) totalAge / persons.size();
}

double averageAge = calculateAverageAge(
        personnes);System.out.println("La moyenne d'âge est de %1.0f an(s).".formatted(averageAge));
La moyenne d'âge est de 21 an(s).

Réutilisation