Stream Java

Université de Toulon

LIS UMR CNRS 7020

2024-12-04

Ce notebook présente brièvement la programmation en pipeline (Stream) à partir de Java 8.

La suite sera illustrée sur un exemple simple composé d’une liste de personnes.


record Person(String email, String prenom, String nom, int age) {}

//Immutable List of Records
final List<Person> personnes = List.of(
     new Person("pierre.durand@a.fr","Pierre", "Durand", 20), 
     new Person("marie.durand@b.fr","Marie", "Durand", 14),
     new Person("albert.martin@c.fr","Albert", "Martin", 30)
    // ...
);
personnes;
[Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20], Person[email=marie.durand@b.fr, prenom=Marie, nom=Durand, age=14], Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]]

Les Streams en Java : une approche fonctionnelle pour traiter les données

  • Qu’est-ce qu’un Stream ? Une vue séquentielle sur des données (collections, fichiers, …).
  • Pourquoi les utiliser ?
    • Code plus concis et expressif
    • Parallélisme facile
    • Intégration avec les fonctionnalités fonctionnelles
  • Opérations clés: filter, map, reduce, sorted, collect
List<String> words = Arrays.asList("Java", "est", "un", "langage", "puissant");
List<String> uppercaseWords = words.stream()
                                    .map(String::toUpperCase)
                                    .collect(Collectors.toList());
uppercaseWords.forEach(logger::info);
15:21:49.220 [IJava-executor-7] INFO  notebook -- JAVA
15:21:49.235 [IJava-executor-7] INFO  notebook -- EST
15:21:49.236 [IJava-executor-7] INFO  notebook -- UN
15:21:49.236 [IJava-executor-7] INFO  notebook -- LANGAGE
15:21:49.236 [IJava-executor-7] INFO  notebook -- PUISSANT

Dans cet exemple :

  • words.stream() crée un stream à partir de la liste words.
  • map(String::toUpperCase) transforme chaque élément en majuscules.
  • collect(Collectors.toList()) collecte les éléments du stream dans une nouvelle liste.
  • La liste est ensuite traitée en flux avec une lambda expression

Pourquoi utiliser les streams ?

  • Code plus concis et expressif: Les opérations sur les streams sont souvent plus courtes et plus faciles à comprendre que les boucles traditionnelles.
  • Parallélisme facile à mettre en œuvre: Le parallélisme peut être ajouté simplement en appelant la méthode parallel().
  • Intégration avec les fonctionnalités fonctionnelles de Java 8: Les streams s’intègrent bien avec les lambda expressions et les références de méthode.

Création et Utilisation de Base des Streams

  • Création:
    • À partir d’une collection : collection.stream()
    • À partir d’un tableau : Arrays.stream(tableau)
    • À partir d’un fichier : BufferedReader.lines()
    • À partir de factories : Stream.of(), IntStream.range()

Un Consummer peut être appliqué à chaque élément d’un stream foreach.

personnes.stream().forEach(p->logger.info("{}, {}",
                                p.nom().toUpperCase(),
                                p.prenom()));
15:28:55.382 [IJava-executor-0] INFO  notebook -- DURAND, Pierre
15:28:55.392 [IJava-executor-0] INFO  notebook -- DURAND, Marie
15:28:55.393 [IJava-executor-0] INFO  notebook -- MARTIN, Albert

Les Streams : générateurs 1/4

Cet exemple de code Java utilise un stream d’entiers pour afficher les nombres de 10 à 14 sur la console.

import java.util.stream.IntStream;
IntStream.range(10,15)
    .forEach(System.out::println);
10
11
12
13
14

Les Streams : générateurs 2/4

Cette ligne de code parcourt chaque élément de la liste personnes et affiche chacun d’eux sur la console.

personnes.stream()
    .forEach(System.out::println);
Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20]
Person[email=marie.durand@b.fr, prenom=Marie, nom=Durand, age=14]
Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]

Les Streams : générateurs 3/4

Ce code lit chaque ligne du fichier /etc/legal et l’affiche sur la console.

import java.util.stream.Stream;
try (Stream stream = Files.lines(Paths.get("/etc/legal"))) {
            stream.forEach(System.out::println);
}

The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

Les Streams : générateurs 4/4

ou les flux d’entrée/sortie (I/O stream)

try (FileReader fileReader = new FileReader("/etc/legal");
  BufferedReader br = new BufferedReader(fileReader)) {
  br.lines().forEach(System.out::println);
}

The programs included with the Ubuntu system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
applicable law.

Filtrage

Ce code utilise un prédicat pour filtrer les personnes majeures d’une liste, puis les affiche à l’aide d’un stream.

personnes.stream()
    .filter(p -> p.age() > 18)
    .forEach(System.out::println);
Person[email=pierre.durand@a.fr, prenom=Pierre, nom=Durand, age=20]
Person[email=albert.martin@c.fr, prenom=Albert, nom=Martin, age=30]

Map

Il est aussi possible d’appliquer une Function à chaque élément du flux avec map().

Par exemple pour transformer un Stream<Person> en Stream<String> pour afficher les noms formattés des personnes de plus de 18 ans.

// 1. Créer un stream à partir de la liste "personnes"
personnes.stream()

    // 2. Filtrer les personnes majeures (âge supérieur à 18 ans)
    .filter(p -> p.age() > 18)

    // 3. Transformer chaque personne en une chaîne de caractères formatée
    .map(p -> String.format("%s %s", p.nom().toUpperCase(), p.prenom().toLowerCase())) 

    // 4. Afficher chaque chaîne de caractères formatée sur la console
    .forEach(System.out::println);
DURAND pierre
MARTIN albert

Il est aussi possible de passer d’un flux d’objet à un flux de primitif.

Un ensemble d’opération aggrégations (disposant d’un systeme d’itération interne) est proposé (average, sum, min, max, count, …)

double ageMoyen = personnes.stream()
    .mapToInt(Person::age)
    .average()
    .getAsDouble();
ageMoyen;
21.333333333333332

Exceptions dans les Interfaces Fonctionnelles

  • Problématique :
    • Les exceptions vérifiées ne peuvent être directement levées dans une expression lambda.
    • Les blocs try-catch peuvent alourdir le code.
  • Solutions :
    • Envelopper dans une RuntimeException (à utiliser avec précaution)
    • Utiliser des références de méthode
    • Créer des interfaces fonctionnelles personnalisées
    • Utiliser des bibliothèques tierces
//Production de 10 entiers à raison d'un par 1/10s et calcul de la somme.
IntStream.range(0, 10)
        .map(i -> {try {
                        Thread.sleep(100);
                        } 
                   catch (java.lang.InterruptedException e) {logger.info("{}",e.getMessage());} 
                   return i*2;})
        .sum();
90
import java.util.concurrent.CompletableFuture;
import java.util.function.IntFunction;
IntFunction<Integer> processElement = i -> {
            try {
                Thread.sleep(100);
                return i * 2;
            } catch (InterruptedException e) {
                // Log the exception with details
                logger.info("Error during calculation {}",e.getMessage());              
                return null; // Indicate failure by returning null
            }
        };

IntStream.range(0, 10)
                .mapToObj(processElement)
                .mapToInt(Integer::intValue) // Convert results to int primitives
                .sum();
90

Une solution élégante est d’écrire une classe wrapper qui gère l’exception par exemple en l’encapsulant dans une runtime exception cf. https://www.baeldung.com/java-lambda-exceptions.

Agrégation des Streams

  • Méthodes générales : reduce() et collect()
    • Grande flexibilité
    • Complexité potentielle
  • Méthodes utilitaires : Collectors
    • Solutions prêtes à l’emploi

    • Opérations courantes :

      • Somme : Collectors.summingInt()
      • Moyenne : Collectors.averagingDouble()
      • Regroupement : Collectors.groupingBy()
    • Exemple :

      List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
      int sum = numbers.stream().collect(Collectors.summingInt(Integer::intValue));

Production d’une collection à partir d’un Stream

import java.util.stream.Collectors;

List<String> AdultNames = personnes.stream()
    .filter(p -> p.age() > 18)
    .map(p-> "%s %s".formatted(p.nom().toUpperCase(), 
                               p.prenom().toLowerCase()))
    .collect(Collectors.toList());

AdultNames;
[DURAND pierre, MARTIN albert]

Opérations de groupement

//Regrouper les prénoms par nom dans une Map.
Map<String, List<String>> PersonfirstnamesGroupedByNames = 
    personnes.stream()
        .collect(
            Collectors.groupingBy(
                Person::nom,                      
                Collectors.mapping(
                    Person::prenom,
                    Collectors.toList())));
                    
PersonfirstnamesGroupedByNames;
{Durand=[Pierre, Marie], Martin=[Albert]}

Concaténation en String

personnes.stream()
    .filter(p -> p.age() > 18)
    .map(p-> "%s %s".formatted(p.nom().toUpperCase(), 
                               p.prenom().toLowerCase()))
    .collect(Collectors.joining(",","-->","<--"));
-->DURAND pierre,MARTIN albert<--

Parallélisme des Streams

Dans les exemples ci-dessous une pause artificielle est ajoutée pour favoriser le parallelisme. cf. https://www.baeldung.com/java-when-to-use-parallel-stream

import java.util.function.BiFunction;

BiFunction<Random, Integer, Stream<Person>> personGenerator=(random, size)->IntStream.range(0, /* limit */ size) //TRICK TO KNOW THE SIZE OF THE STREAM
        .mapToObj(unused -> {try {Thread.sleep(10);} catch (java.lang.InterruptedException e) {};
                             return new Person("pierre.durand@a.fr","Pierre", "Durand", random.nextInt(100));});

long seed = System.currentTimeMillis(); // Get current time in milliseconds
Random random = new Random(seed);

double ageMoyen;
long startTime = System.nanoTime();
ageMoyen = personGenerator.apply(random, 200) 
        .mapToInt(Person::age)
        .average()
        .getAsDouble(); //Il s'agit d'un Optionnal
long endTime = System.nanoTime();
System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen, (endTime - startTime)/1000000));
moyenne calculée 47 en 2463ms
Random random = new Random(seed);

double ageMoyen;
long startTime = System.nanoTime();
ageMoyen =  personGenerator.apply(random, 200)
        .parallel()
        .mapToInt(Person::age)
        .average()
        .getAsDouble(); //Il s'agit d'un Optionnal
long endTime = System.nanoTime();
System.out.println("moyenne calculée %1.0f en %dms".formatted(ageMoyen, (endTime - startTime)/1000000));
moyenne calculée 47 en 318ms

Les streams parallèles en Java exploitent le modèle de concurrence ForkJoin pour paralléliser les traitements sur les collections. Le framework ForkJoin divise récursivement les tâches en sous-tâches jusqu’à un niveau de granularité optimal, puis les exécute en parallèle. Cette approche permet de tirer pleinement parti des ressources multi-cœurs des machines modernes.

ForkJoinPool pool = new ForkJoinPool();
System.out.println("Parallelism = " +pool.getParallelism()); 
Parallelism = 8
public static class AgeSumTask extends RecursiveTask<Long> {
    private final List<Person> persons;
    private final int start;
    private final int end;

    public AgeSumTask(List<Person> persons, int start, int end) {
        this.persons = persons;
        this.start = start;
        this.end = end;
    }

    protected Long compute() {
        if (end - start <= 10_000) { // Seuil pour le découpage
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += persons.get(i).age();
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            AgeSumTask left = new AgeSumTask(persons, start, middle);
            AgeSumTask right = new AgeSumTask(persons, middle, end);
            left.fork();
            long rightResult = right.compute();
            long leftResult = left.join();
            return leftResult + rightResult;
        }
    }

    }

    public static double calculateAverageAge(List<Person> persons) {
        ForkJoinPool pool = new ForkJoinPool();
        AgeSumTask task = new AgeSumTask(persons, 0, persons.size());
        long totalAge = pool.invoke(task);
        return (double) totalAge / persons.size();
}

double averageAge = calculateAverageAge(
        personnes);System.out.println("La moyenne d'âge est de %1.0f an(s).".formatted(averageAge));
La moyenne d'âge est de 21 an(s).

Stream API Enhancements

  • Stream API Enhancements (Java 9+)

  • takeWhile() et dropWhile()


Stream.of(1, 2, 3, 4, 1, 2, 3)
    .takeWhile(n -> n < 3)    // [1, 2]
    .forEach(System.out::println);

Stream.of(1, 2, 3, 4, 1, 2, 3)
    .dropWhile(n -> n < 3)    // [3, 4, 1, 2, 3]
    .forEach(System.out::println);
1
2
3
4
1
2
3

iterate() avec prédicat


Stream.iterate(0, n -> n < 10, n -> n + 2)
    .forEach(System.out::println);

// Crée un Stream vide si null, sinon Stream d'un élément
Stream.ofNullable(null).count();    // 0
Stream.ofNullable("value").count(); // 1

//Optional to Stream
Optional.of("value")
    .stream()
    .map(String::toUpperCase)
    .forEach(System.out::println);
0
2
4
6
8
VALUE

Collectors améliorés

// filtering
// Create a list of integers from 1 to 9
List<Integer> evenNums = IntStream.range(1, 10)
    .boxed() // Convert IntStream to Stream<Integer>
    .collect(Collectors.filtering(
        n -> n % 2 == 0, // Filter to keep only even numbers
        Collectors.toList() // Collect the results into a List
    ));

evenNums;
[2, 4, 6, 8]
// Définition d'une personne avec son nom et sa liste de téléphones
record Person(String nom, List<String> phones) {}

// Jeu de données avec doublons intentionnels (2 "Bob")
List<Person> persons = List.of(
    new Person("Alice", List.of("123", "456")),
    new Person("Bob", List.of("789")),
    new Person("Bob", List.of("555"))
);

// Regroupement et aplatissement des téléphones par nom:
// 1. groupingBy: crée un Map avec les noms comme clés
// 2. flatMapping: fusionne toutes les listes de téléphones en une seule liste
Map<String, List<String>> phonesByName = persons.stream()
    .collect(Collectors.groupingBy(
        Person::nom,                          // Groupe par nom
        Collectors.flatMapping(               // Pour chaque groupe:
            p -> p.phones().stream(),         // - transforme la liste en stream
            Collectors.toList()               // - collecte tous les numéros
        ))
    );

// Affiche: {Bob=[789, 555], Alice=[123, 456]}
System.out.println(phonesByName);
{Bob=[789, 555], Alice=[123, 456]}