2024-11-27
Programmation réactive: Un paradigme de programmation orienté vers les flux de données et la propagation des changements.
Réactivité: Capacité d’un système à réagir aux événements en temps réel.
Les principes de la programmation réactive
6
12
18
reactor.core.publisher.LambdaSubscriber@289fec35
Ajouter la dépendance Maven suivante à votre fichier pom.xml
:
Mono
et Flux
implantent l’interface PublisherFlux.just(1, 2, 3)
Flux.fromArray(array)
Flux.fromIterable(list)
Flux.fromStream(stream)
Flux.range(1, 10)
: Séquence de nombresFlux.interval(Duration)
: Émission périodiqueFlux.generate()
: Génération synchroneFlux.create()
: Génération avec backpressureFlux.interval(Duration.ofSeconds(1))
Flux.timer(Duration.ofMillis(500))
Flux.create(sink -> {...})
Flux.push(sink -> {...})
Mono.fromFuture(future)
Flux.fromCompletionStage(stage)
Filtrage Simple et Déduplication - filter
: Filtre selon prédicat - filter(x -> x > 0)
- filter(String::isNotEmpty)
distinct
: Élimine doublons
distinctUntilChanged
Limitation - take(n)
: n premiers éléments - takeLast(n)
: n derniers - takeWhile
: condition vraie - takeUntil
: jusqu’à condition
Saut d’Éléments - skip(n)
: Ignore n premiers - skipLast(n)
: Ignore n derniers - skipWhile
: Ignore si vrai - skipUntil
: Ignore jusqu’à vrai
Sélection Spécifique - elementAt
: Position précise - next
: Premier élément - last
: Dernier élément - single
: Vérifie unicité
Fusion de Flux
merge
: Combine en intercalant
concat
: Concatène séquentiellement
mergeSequential
: Fusion ordonnée
Combinaison Synchronisée
zip
: Combine par paires
combineLatest
: Dernières valeurs
Utilitaires
withLatestFrom
: EnrichissementstartWith
: Valeurs initialessample
: Échantillonnage// Combinaison des flux avec merge
// L'opérateur merge combine les flux en intercalant les éléments des deux flux
// Ensuite, on filtre les éléments pour ne garder que ceux qui sont divisibles par 4
Flux<Integer> mergedFlux = Flux.merge(flux1, flux2)
.filter(i -> i % 4 == 0); // Filtrage des éléments divisibles par 4
// Abonnement au flux combiné pour imprimer les valeurs filtrées
mergedFlux.subscribe(System.out::println);
4
8
12
reactor.core.publisher.LambdaSubscriber@67337d67
// Combinaison des flux avec concat
// L'opérateur concat combine les flux en les concaténant
// Ensuite, on filtre les éléments pour ne garder que ceux qui sont divisibles par 4
Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2)
.filter(i -> i % 4 == 0); // Filtrage des éléments divisibles par 4
// Abonnement au flux combiné pour imprimer les valeurs filtrées
concatenatedFlux.subscribe(System.out::println);
4
8
12
reactor.core.publisher.LambdaSubscriber@34fbf0a7
// Combinaison des flux avec zip
// L'opérateur zip combine les éléments de flux1 et flux2 par leur position
// La fonction de combinaison prend un élément de chaque flux et les formate en une chaîne
Flux<String> zippedFlux = Flux.zip(flux1, flux2, (a, b) -> "%s - %s".formatted(a, b));
// Abonnement au flux combiné pour imprimer les valeurs combinées
zippedFlux.subscribe(System.out::println);
1 - 6
2 - 7
3 - 8
4 - 9
5 - 10
reactor.core.publisher.LambdaSubscriber@36b8194b
import java.time.Duration;
// Création du premier flux qui émet un élément toutes les secondes
Flux<Integer> flux1 = Flux.interval(Duration.ofSeconds(1))
.map(Long::intValue) // Conversion des Long en Integer
.take(5); // Prendre les 5 premiers éléments
// Création du deuxième flux qui émet un élément toutes les deux secondes
Flux<Integer> flux2 = Flux.interval(Duration.ofSeconds(2))
.map(Long::intValue) // Conversion des Long en Integer
.take(5); // Prendre les 5 premiers éléments
// Combinaison des deux flux en utilisant l'opérateur combineLatest
// combineLatest émet une nouvelle valeur chaque fois que l'un des flux source émet une valeur
// La fonction de combinaison prend les derniers éléments émis par chaque flux et les combine en une chaîne formatée
Flux<String> combinedFlux = Flux.combineLatest(flux1, flux2, (a, b) -> "%s %s".formatted(a, b));
combinedFlux.subscribe(System.out::println);
reactor.core.publisher.LambdaSubscriber@58fa73d4
public interface Subscriber<T> {
// Called first when subscribing to a Publisher
// Provides the Subscription object to control the flow
// Should call subscription.request() to start receiving items
void onSubscribe(Subscription s);
// Called for each item emitted by the Publisher
// Can be called 0 to N times based on the number of items
// Will not be called after onError or onComplete
void onNext(T t);
// Called when an error occurs in the stream
// Terminal operation - no more calls will happen after this
// Must not be called after onComplete
void onError(Throwable t);
// Called when the Publisher has no more items to emit
// Terminal operation - no more calls will happen after this
// Must not be called after onError
void onComplete();
}
public class BackpressureSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
private final int batchSize = 3;
private int currentReceived = 0;
private static final int PROCESSING_TIME_MS = 1000;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
System.out.println("🔵 Initial request for " + batchSize + " items");
subscription.request(batchSize); // Demande initiale
}
@Override
public void onNext(T item) {
try {
Thread.sleep(PROCESSING_TIME_MS); // Traitement lent
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("📥 Received: " + item);
currentReceived++;
if (currentReceived == batchSize) {
currentReceived = 0;
System.out.println("🔄 Requesting next " + batchSize + " items");
subscription.request(batchSize);
}
}
@Override
public void onError(Throwable t) {
System.err.println("❌ Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ Completed");
}
}
// Example usage
Flux<Integer> numberFlux = Flux.range(1, 10);
numberFlux.subscribe(new BackpressureSubscriber());
🔵 Initial request for 3 items
📥 Received: 1
1 0
📥 Received: 2
2 0
📥 Received: 3
🔄 Requesting next 3 items
3 0
3 1
📥 Received: 4
4 1
📥 Received: 5
4 2
📥 Received: 6
🔄 Requesting next 3 items
📥 Received: 7
4 3
📥 Received: 8
📥 Received: 9
🔄 Requesting next 3 items
4 4
📥 Received: 10
✅ Completed
Flux<Integer> fluxWithResume = Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Erreur à l'élément 3");
}
return i;
})
// Remplace par un autre flux en cas d'erreur
.onErrorResume(e -> Flux.just(10, 20, 30));
fluxWithResume.subscribe(System.out::println);
1
2
10
20
30
reactor.core.publisher.LambdaSubscriber@43edd53b
Concept du Backpressure
Contrôle par le Subscriber
request(n)
: Demande n élémentsrequest(Long.MAX_VALUE)
: Flux illimitécancel()
: Arrêt immédiatStratégies de Gestion
buffer()
drop()
latest()
error()
import java.time.Duration;
Flux<Long> bufferedFlux = Flux.interval(Duration.ofMillis(10)) // Émission très rapide
// Stocke les éléments dans un buffer de taille 10 en cas de backpressure
.onBackpressureBuffer(10, i -> System.out.println("Buffered: " + i))
.take(50);
bufferedFlux.subscribe(new BackpressureSubscriber());
System.out.println("Number of send elements: "+bufferedFlux.blockLast()); // Bloque l'exécution pour voir le résultat
🔵 Initial request for 3 items
Number of send elements: 49
Flux<Long> latestFlux = Flux.interval(Duration.ofMillis(10)) // Émission très rapide
.onBackpressureLatest() // Garde uniquement le dernier élément en cas de backpressure
.take(50);
latestFlux.subscribe(new BackpressureSubscriber());
latestFlux.blockLast();
🔵 Initial request for 3 items
📥 Received: 0
49
Concept et Utilisation
Flux
avec émission contrôléeMéthodes de Contrôle
connect()
: Démarre l’émissionautoConnect(n)
: Connexion après n abonnésrefCount(n)
: Gestion automatique connexion/déconnexionPatterns d’Utilisation
publish()
: Crée un ConnectableFluxreplay()
: Met en cache les élémentsshare()
: Combine publish et refCountCas d’Usage
import reactor.core.publisher.ConnectableFlux;
// Création d'un ConnectableFlux qui :
// 1. Génère une séquence de 1 à 3
// 2. Ajoute un délai d'une seconde entre chaque élément
// 3. Convertit les entiers en Long
// 4. Transforme le Flux en ConnectableFlux avec publish()
ConnectableFlux<Long> flux = Flux.range(1, 3)
.map(Long::valueOf) // Conversion en Long
.publish(); // Création ConnectableFlux
// Premier abonnement - ne reçoit rien tant que connect() n'est pas appelé
flux.subscribe(i -> System.out.println("Subscriber 1: " + i));
// Attente de 3 secondes avant le second abonnement
Thread.sleep(3000);
// Second abonnement - recevra les mêmes données que le premier à partir du moment où connect() est appelé
flux.subscribe(i -> System.out.println("Subscriber 2: " + i));
// Démarre l'émission des éléments pour tous les abonnés
// Les deux abonnés vont recevoir les mêmes valeurs simultanément
flux.connect();
📥 Received: 0
📥 Received: 1
📥 Received: 1
📥 Received: 2
🔄 Requesting next 3 items
📥 Received: 2
🔄 Requesting next 3 items
📥 Received: 3
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
Subscriber 1: 3
Subscriber 2: 3
reactor.core.publisher.FluxPublish$PublishSubscriber@3c6669e8
Principes Fondamentaux - Test déclaratif des flux réactifs - Vérification pas à pas du comportement - Support du backpressure - Tests synchrones et asynchrones
Méthodes Principales - expectNext()
: Vérifie valeur suivante - expectNextCount()
: Nombre d’éléments - expectComplete()
: Vérifie fin normale - expectError()
: Vérifie erreur - verifyTimeout()
: Test avec timeout - ajouter la dépendance suivante à votre fichier pom.xml
:
%maven io.projectreactor:reactor-test:3.7.0
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
Flux<String> testedFlux = Flux.just("A", "B", "C");
StepVerifier.create(testedFlux)
.expectNext("A")
.expectNext("B")
.expectNext("C")
.expectNext("D") // Erreur attendue
.verifyComplete();
📥 Received: 3
EvalException: expectation "expectNext(D)" failed (expected: onNext(D); actual: onComplete())
Execution exception
---------------------------------------------------------------------------
java.lang.AssertionError: expectation "expectNext(D)" failed (expected: onNext(D); actual: onComplete())
at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue$10(DefaultStepVerifierBuilder.java:509)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2289)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1529)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1477)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onComplete(DefaultStepVerifierBuilder.java:1117)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSubscribe(DefaultStepVerifierBuilder.java:1161)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:891)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:831)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:823)
at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:690)
at .(#220:1)
Opérateur log()
- Trace tous les événements du flux - Signaux: onNext, onError, onComplete - Requêtes backpressure - Options de configuration: - Niveau de log - Logger personnalisé - Catégorie
Points de Contrôle - checkpoint()
- Capture stack trace - Identifie source d’erreurs - Impact performance minimal
Flux<Integer> flux = Flux.range(1, 2)
.log(); // Ajoute des logs pour chaque événement du flux
flux.subscribe(System.out::println);
05:49:05.930 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | request(unbounded)
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onNext(1)
1
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onNext(2)
2
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onComplete()
reactor.core.publisher.LambdaSubscriber@4599b7fb
Flux<Integer> flux = Flux.range(1, 5)
.map(i -> {
if (i == 3) {
throw new RuntimeException("Erreur à l'élément 3");
}
return i;
})
.checkpoint("Point de contrôle avant l'erreur");
flux.subscribe(System.out::println, Throwable::printStackTrace);
1
2
java.lang.RuntimeException: Erreur à l'élément 3
at REPL.$JShell$66K.lambda$do_it$$0($JShell$66K.java:57)
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Error has been observed at the following site(s):
*__checkpoint ⇢ Point de contrôle avant l'erreur
Original Stack Trace:
at REPL.$JShell$66K.lambda$do_it$$0($JShell$66K.java:57)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:131)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
at reactor.core.publisher.Flux.subscribe(Flux.java:8333)
at REPL.$JShell$124.do_it$($JShell$124.java:57)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at io.github.spencerpark.ijava.execution.IJavaExecutionControl.lambda$execute$1(IJavaExecutionControl.java:95)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
reactor.core.publisher.LambdaSubscriber@70550ed8
E. Bruno