%maven io.projectreactor:reactor-core:3.7.0
Programmation réactive avec Reactor
Introduction à la programmation réactive
- Évolution des systèmes:
- Besoin croissant: Les applications modernes nécessitent une gestion efficace des ressources et une haute disponibilité.
- Charges variables: Les systèmes doivent s’adapter dynamiquement aux variations de la charge utilisateur.
- Événements en temps réel: Capacité à traiter et à réagir aux événements dès qu’ils se produisent.
- Scalabilité: Les systèmes doivent pouvoir évoluer pour gérer un grand nombre d’utilisateurs et de données.
- Efficacité: Utilisation optimale des ressources pour maximiser les performances et réduire les coûts.
- Reactive Manifesto:
- Un document fondateur publié en 2013.
- Définit les principes de la programmation réactive pour construire des systèmes réactifs.
Définition de la programmation réactive
Programmation réactive: Un paradigme de programmation orienté vers les flux de données et la propagation des changements.
Réactivité: Capacité d’un système à réagir aux événements en temps réel.
Les principes de la programmation réactive
- Responsive: Le système doit répondre rapidement et de manière cohérente.
- Resilient: Le système doit rester fonctionnel même en cas de défaillance.
- Elastic: Le système doit s’adapter à la variation de la charge.
- Message Driven: Le système doit utiliser des messages asynchrones pour garantir une communication efficace.
Comparaison avec la programmation impérative et asynchrone
- Programmation impérative: Basée sur des instructions séquentielles, moins adaptée aux systèmes réactifs.
- Programmation asynchrone: Utilise des callbacks, des promesses ou des futures pour gérer les opérations non bloquantes.
- Programmation réactive: Étend la programmation asynchrone avec des flux de données et une gestion plus fine des événements.
Des API réactives dans différents langages
- Java:
- Project Reactor: Une bibliothèque pour la programmation réactive basée sur la Reactive Streams API.
- RxJava: Une bibliothèque pour la programmation réactive basée sur le modèle des Observables.
- JavaScript:
- RxJS: Une bibliothèque pour la programmation réactive en JavaScript, utilisée notamment avec Angular.
- Bacon.js: Une bibliothèque pour la programmation réactive fonctionnelle en JavaScript.
- Python:
- RxPY: Une implémentation de Reactive Extensions pour Python.
- Streamz: Une bibliothèque pour la manipulation de flux de données en temps réel.
- C#:
- Reactive Extensions (Rx.NET): Une bibliothèque pour la programmation réactive en .NET.
- System.Reactive: Une autre implémentation de Reactive Extensions pour .NET.
- Kotlin:
- Kotlin Flow: Une API pour la programmation réactive dans Kotlin, intégrée à Kotlin Coroutines.
- RxKotlin: Une extension de RxJava pour Kotlin.
Introduction à la Reactive Streams API
- Reactive Streams API:
- Une spécification standard pour le traitement des flux de données asynchrones avec gestion de la pression (backpressure).
- Conçue pour assurer une communication fluide et efficace entre les composants réactifs.
- Objectifs:
- Interopérabilité: Permettre l’interopérabilité entre différentes bibliothèques réactives, telles que Project Reactor, RxJava, et Akka Streams.
- Gestion efficace des flux de données: Assurer que les producteurs de données ne submergent pas les consommateurs, en régulant le flux de données pour éviter les surcharges et les blocages.
- Asynchronisme: Faciliter le traitement asynchrone des flux de données pour améliorer les performances et la réactivité des applications.
- Backpressure: Introduire des mécanismes pour gérer la pression exercée par les producteurs de données sur les consommateurs, permettant ainsi une meilleure gestion des ressources et une prévention des surcharges.
- Comparaison avec Java Streams API:
- Java Streams API: Conçue pour le traitement de collections de données en mémoire.
- Reactive Streams API: Conçue pour le traitement de flux de données asynchrones avec gestion de la pression.
- Push vs Pull: Java Streams utilise un modèle pull, tandis que Reactive Streams utilise un modèle push avec gestion de la pression.
Exemple de Reactive Streams API en Java avec Project Reactor
import reactor.core.publisher.Flux;
<Integer> flux = Flux.range(1, 10)
Flux.map(i -> i * 2)
.filter(i -> i % 3 == 0);
.subscribe(System.out::println); flux
6
12
18
reactor.core.publisher.LambdaSubscriber@289fec35
Introduction à Project Reactor
- Présentation de Project Reactor:
- Project Reactor est une bibliothèque pour la programmation réactive en Java, basée sur la Reactive Streams API.
- Elle permet de créer des applications réactives, non bloquantes et hautement performantes.
- Installation et configuration de Reactor dans un projet Java:
Ajouter la dépendance Maven suivante à votre fichier
pom.xml
:dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.7.0</version> <dependency> </
Mono
- représente une séquence asynchrone qui émet zéro ou un élément.
- Utilisé pour les opérations qui retournent un seul résultat ou aucun.
<String> mono = Mono.just("Hello, Reactor!");
Mono.subscribe(System.out::println); mono
Hello, Reactor!
reactor.core.publisher.LambdaMonoSubscriber@40d8d1e5
Flux
- Représente une séquence asynchrone qui émet zéro, un ou plusieurs éléments.
- Utilisé pour les opérations qui retournent plusieurs résultats.
<Integer> flux = Flux.range(1, 5);
Flux.subscribe(System.out::println); flux
1
2
3
4
5
reactor.core.publisher.LambdaSubscriber@5ee33937
Publisher interface
- Interface fondamentale de Reactive Streams
- Source de données qui émet des éléments de manière asynchrone
- Point de départ du flux de données réactif
Mono
etFlux
implantent l’interface Publisher
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Génération de Flux
- Création depuis des Données
- Valeurs Discrètes
Flux.just(1, 2, 3)
Flux.fromArray(array)
Flux.fromIterable(list)
Flux.fromStream(stream)
- Génération Programmée
Flux.range(1, 10)
: Séquence de nombresFlux.interval(Duration)
: Émission périodiqueFlux.generate()
: Génération synchroneFlux.create()
: Génération avec backpressure
- Valeurs Discrètes
- Sources Asynchrones
- Temps
Flux.interval(Duration.ofSeconds(1))
Flux.timer(Duration.ofMillis(500))
- Callbacks
Flux.create(sink -> {...})
Flux.push(sink -> {...})
- CompletableFuture
Mono.fromFuture(future)
Flux.fromCompletionStage(stage)
- Temps
Opérateurs de Transformation
- map: Transformation synchrone élément par élément
- Conversion de types
- Formatage de données
- Calculs simples
- flatMap: Transformation asynchrone avec aplatissement
- Appels services externes
- Opérations I/O
- Flux imbriqués
- transform: Modification globale du flux
- Composition de transformations
- Réutilisation de logique
- Modification de la chaîne
- switchMap: Gestion des changements rapides
- Annulation automatique
- Dernier flux uniquement
- Recherche interactive
<Integer> flux = Flux.range(1, 5)
Flux.map(i -> i * 2); // Transformation des éléments
.subscribe(System.out::println); flux
2
4
6
8
10
reactor.core.publisher.LambdaSubscriber@454de8b5
Opérateurs de Filtrage dans Project Reactor
Filtrage Simple et Déduplication - filter
: Filtre selon prédicat - filter(x -> x > 0)
- filter(String::isNotEmpty)
distinct
: Élimine doublons- Utilise equals/hashCode
- Option keySelector
distinctUntilChanged
- Élimine doublons consécutifs
- Compare éléments adjacents
Limitation - take(n)
: n premiers éléments - takeLast(n)
: n derniers - takeWhile
: condition vraie - takeUntil
: jusqu’à condition
Saut d’Éléments - skip(n)
: Ignore n premiers - skipLast(n)
: Ignore n derniers - skipWhile
: Ignore si vrai - skipUntil
: Ignore jusqu’à vrai
Sélection Spécifique - elementAt
: Position précise - next
: Premier élément - last
: Dernier élément - single
: Vérifie unicité
<Integer> flux = Flux.just(1, 2, 2, 3, 3, 3, 4, 6, 6, 7, 7, 10)
Flux.filter(i -> i % 2 == 0) // Garder les pairs
.filter(i -> i > 4) // Garder les nombres > 4
.distinct(); // Supprimer les doublons
.subscribe(System.out::println); flux
6
10
reactor.core.publisher.LambdaSubscriber@5ebb4980
Opérateurs de Combinaison de Flux
Fusion de Flux
merge
: Combine en intercalant- Ordre d’arrivée préservé
- Pas de synchronisation
- Parallèle
concat
: Concatène séquentiellement- Ordre strict respecté
- Séquentiel
- Sans entrelacement
mergeSequential
: Fusion ordonnée- Souscription parallèle
- Émission ordonnée
- Préserve ordre
Combinaison Synchronisée
zip
: Combine par paires- Attend les deux sources
- Transformation possible
- 1:1 matching
combineLatest
: Dernières valeurs- Combine valeurs récentes
- Réactif aux changements
- N:M possible
Utilitaires
withLatestFrom
: EnrichissementstartWith
: Valeurs initialessample
: Échantillonnage
// Création de deux flux
<Integer> flux1 = Flux.range(1, 5); // Flux de 1 à 4
Flux<Integer> flux2 = Flux.range(6, 10); // Flux de 6 à 9 Flux
// Combinaison des flux avec merge
// L'opérateur merge combine les flux en intercalant les éléments des deux flux
// Ensuite, on filtre les éléments pour ne garder que ceux qui sont divisibles par 4
<Integer> mergedFlux = Flux.merge(flux1, flux2)
Flux.filter(i -> i % 4 == 0); // Filtrage des éléments divisibles par 4
// Abonnement au flux combiné pour imprimer les valeurs filtrées
.subscribe(System.out::println); mergedFlux
4
8
12
reactor.core.publisher.LambdaSubscriber@67337d67
// Combinaison des flux avec concat
// L'opérateur concat combine les flux en les concaténant
// Ensuite, on filtre les éléments pour ne garder que ceux qui sont divisibles par 4
<Integer> concatenatedFlux = Flux.concat(flux1, flux2)
Flux.filter(i -> i % 4 == 0); // Filtrage des éléments divisibles par 4
// Abonnement au flux combiné pour imprimer les valeurs filtrées
.subscribe(System.out::println); concatenatedFlux
4
8
12
reactor.core.publisher.LambdaSubscriber@34fbf0a7
// Combinaison des flux avec zip
// L'opérateur zip combine les éléments de flux1 et flux2 par leur position
// La fonction de combinaison prend un élément de chaque flux et les formate en une chaîne
<String> zippedFlux = Flux.zip(flux1, flux2, (a, b) -> "%s - %s".formatted(a, b));
Flux
// Abonnement au flux combiné pour imprimer les valeurs combinées
.subscribe(System.out::println); zippedFlux
1 - 6
2 - 7
3 - 8
4 - 9
5 - 10
reactor.core.publisher.LambdaSubscriber@36b8194b
import java.time.Duration;
// Création du premier flux qui émet un élément toutes les secondes
<Integer> flux1 = Flux.interval(Duration.ofSeconds(1))
Flux.map(Long::intValue) // Conversion des Long en Integer
.take(5); // Prendre les 5 premiers éléments
// Création du deuxième flux qui émet un élément toutes les deux secondes
<Integer> flux2 = Flux.interval(Duration.ofSeconds(2))
Flux.map(Long::intValue) // Conversion des Long en Integer
.take(5); // Prendre les 5 premiers éléments
// Combinaison des deux flux en utilisant l'opérateur combineLatest
// combineLatest émet une nouvelle valeur chaque fois que l'un des flux source émet une valeur
// La fonction de combinaison prend les derniers éléments émis par chaque flux et les combine en une chaîne formatée
<String> combinedFlux = Flux.combineLatest(flux1, flux2, (a, b) -> "%s %s".formatted(a, b));
Flux
.subscribe(System.out::println); combinedFlux
reactor.core.publisher.LambdaSubscriber@58fa73d4
Interface Subscriber
- Interface fondamentale de Reactive Streams
- Consommateur de données qui reçoit les éléments de manière asynchrone
- Point de terminaison du flux de données réactif
public interface Subscriber<T> {
// Called first when subscribing to a Publisher
// Provides the Subscription object to control the flow
// Should call subscription.request() to start receiving items
void onSubscribe(Subscription s);
// Called for each item emitted by the Publisher
// Can be called 0 to N times based on the number of items
// Will not be called after onError or onComplete
void onNext(T t);
// Called when an error occurs in the stream
// Terminal operation - no more calls will happen after this
// Must not be called after onComplete
void onError(Throwable t);
// Called when the Publisher has no more items to emit
// Terminal operation - no more calls will happen after this
// Must not be called after onError
void onComplete();
}
public class BackpressureSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
private final int batchSize = 3;
private int currentReceived = 0;
private static final int PROCESSING_TIME_MS = 1000;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
System.out.println("🔵 Initial request for " + batchSize + " items");
.request(batchSize); // Demande initiale
subscription}
@Override
public void onNext(T item) {
try {
Thread.sleep(PROCESSING_TIME_MS); // Traitement lent
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("📥 Received: " + item);
++;
currentReceived
if (currentReceived == batchSize) {
= 0;
currentReceived System.out.println("🔄 Requesting next " + batchSize + " items");
.request(batchSize);
subscription}
}
@Override
public void onError(Throwable t) {
System.err.println("❌ Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("✅ Completed");
}
}
// Example usage
<Integer> numberFlux = Flux.range(1, 10);
Flux.subscribe(new BackpressureSubscriber()); numberFlux
🔵 Initial request for 3 items
📥 Received: 1
1 0
📥 Received: 2
2 0
📥 Received: 3
🔄 Requesting next 3 items
3 0
3 1
📥 Received: 4
4 1
📥 Received: 5
4 2
📥 Received: 6
🔄 Requesting next 3 items
📥 Received: 7
4 3
📥 Received: 8
📥 Received: 9
🔄 Requesting next 3 items
4 4
📥 Received: 10
✅ Completed
Gestion des erreurs avec onErrorReturn
<Integer> flux = Flux.range(1, 5)
Flux.map(i -> {
if (i == 3) {
throw new RuntimeException("Erreur à l'élément 3");
}
return i;
})
.onErrorReturn(-1); // Retourne -1 en cas d'erreur
.subscribe(System.out::println); flux
1
2
-1
reactor.core.publisher.LambdaSubscriber@2b991053
Gestion des erreurs avec onErrorResume
<Integer> fluxWithResume = Flux.range(1, 5)
Flux.map(i -> {
if (i == 3) {
throw new RuntimeException("Erreur à l'élément 3");
}
return i;
})
// Remplace par un autre flux en cas d'erreur
.onErrorResume(e -> Flux.just(10, 20, 30));
.subscribe(System.out::println); fluxWithResume
1
2
10
20
30
reactor.core.publisher.LambdaSubscriber@43edd53b
Gestion du Backpressure dans Project Reactor
Concept du Backpressure
- Régulation du flux producteur/consommateur
- Évite la surcharge du consommateur
- Communication bidirectionnelle
Contrôle par le Subscriber
request(n)
: Demande n élémentsrequest(Long.MAX_VALUE)
: Flux illimitécancel()
: Arrêt immédiat
Stratégies de Gestion
buffer()
- Stockage temporaire
- Mémoire tampon configurable
- Préserve les données
drop()
- Suppression des excédents
- Sans impact mémoire
- Perte acceptée
latest()
- Garde valeur récente
- Mise à jour continue
- Sampling
error()
- Signalement erreur
- Protection système
- Circuit breaker
import java.time.Duration;
<Long> bufferedFlux = Flux.interval(Duration.ofMillis(10)) // Émission très rapide
Flux// Stocke les éléments dans un buffer de taille 10 en cas de backpressure
.onBackpressureBuffer(10, i -> System.out.println("Buffered: " + i))
.take(50);
.subscribe(new BackpressureSubscriber());
bufferedFlux
System.out.println("Number of send elements: "+bufferedFlux.blockLast()); // Bloque l'exécution pour voir le résultat
🔵 Initial request for 3 items
Number of send elements: 49
<Long> latestFlux = Flux.interval(Duration.ofMillis(10)) // Émission très rapide
Flux.onBackpressureLatest() // Garde uniquement le dernier élément en cas de backpressure
.take(50);
.subscribe(new BackpressureSubscriber());
latestFlux
.blockLast(); latestFlux
🔵 Initial request for 3 items
📥 Received: 0
49
ConnectableFlux - Flux Partagé et Contrôlé
Concept et Utilisation
- Variante de
Flux
avec émission contrôlée - Partage un flux entre multiples abonnés
- “Hot” publisher - émission indépendante des abonnements
Méthodes de Contrôle
connect()
: Démarre l’émissionautoConnect(n)
: Connexion après n abonnésrefCount(n)
: Gestion automatique connexion/déconnexion
Patterns d’Utilisation
publish()
: Crée un ConnectableFluxreplay()
: Met en cache les élémentsshare()
: Combine publish et refCount
Cas d’Usage
- Diffusion d’événements
- Partage de données en temps réel
- Cache partagé
- Multicast de flux
import reactor.core.publisher.ConnectableFlux;
// Création d'un ConnectableFlux qui :
// 1. Génère une séquence de 1 à 3
// 2. Ajoute un délai d'une seconde entre chaque élément
// 3. Convertit les entiers en Long
// 4. Transforme le Flux en ConnectableFlux avec publish()
<Long> flux = Flux.range(1, 3)
ConnectableFlux.map(Long::valueOf) // Conversion en Long
.publish(); // Création ConnectableFlux
// Premier abonnement - ne reçoit rien tant que connect() n'est pas appelé
.subscribe(i -> System.out.println("Subscriber 1: " + i));
flux
// Attente de 3 secondes avant le second abonnement
Thread.sleep(3000);
// Second abonnement - recevra les mêmes données que le premier à partir du moment où connect() est appelé
.subscribe(i -> System.out.println("Subscriber 2: " + i));
flux
// Démarre l'émission des éléments pour tous les abonnés
// Les deux abonnés vont recevoir les mêmes valeurs simultanément
.connect(); flux
📥 Received: 0
📥 Received: 1
📥 Received: 1
📥 Received: 2
🔄 Requesting next 3 items
📥 Received: 2
🔄 Requesting next 3 items
📥 Received: 3
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
Subscriber 1: 3
Subscriber 2: 3
reactor.core.publisher.FluxPublish$PublishSubscriber@3c6669e8
Introduction aux tests réactifs avec StepVerifier
Principes Fondamentaux - Test déclaratif des flux réactifs - Vérification pas à pas du comportement - Support du backpressure - Tests synchrones et asynchrones
Méthodes Principales - expectNext()
: Vérifie valeur suivante - expectNextCount()
: Nombre d’éléments - expectComplete()
: Vérifie fin normale - expectError()
: Vérifie erreur - verifyTimeout()
: Test avec timeout - ajouter la dépendance suivante à votre fichier pom.xml
:
dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.7.0</version>
<scope>test</scope>
<dependency> </
%maven io.projectreactor:reactor-test:3.7.0
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
<String> testedFlux = Flux.just("A", "B", "C");
Flux
.create(testedFlux)
StepVerifier.expectNext("A")
.expectNext("B")
.expectNext("C")
.expectNext("D") // Erreur attendue
.verifyComplete();
📥 Received: 3
EvalException: expectation "expectNext(D)" failed (expected: onNext(D); actual: onComplete())
Execution exception
---------------------------------------------------------------------------
java.lang.AssertionError: expectation "expectNext(D)" failed (expected: onNext(D); actual: onComplete())
at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue$10(DefaultStepVerifierBuilder.java:509)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2289)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1529)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1477)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onComplete(DefaultStepVerifierBuilder.java:1117)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSubscribe(DefaultStepVerifierBuilder.java:1161)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:891)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:831)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:823)
at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:690)
at .(#220:1)
Débogage et log pour le code réactif
Opérateur log()
- Trace tous les événements du flux - Signaux: onNext, onError, onComplete - Requêtes backpressure - Options de configuration: - Niveau de log - Logger personnalisé - Catégorie
Points de Contrôle - checkpoint()
- Capture stack trace - Identifie source d’erreurs - Impact performance minimal
<Integer> flux = Flux.range(1, 2)
Flux.log(); // Ajoute des logs pour chaque événement du flux
.subscribe(System.out::println); flux
05:49:05.930 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | request(unbounded)
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onNext(1)
1
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onNext(2)
2
05:49:05.931 [IJava-executor-13] INFO reactor.Flux.Range.2 -- | onComplete()
reactor.core.publisher.LambdaSubscriber@4599b7fb
<Integer> flux = Flux.range(1, 5)
Flux.map(i -> {
if (i == 3) {
throw new RuntimeException("Erreur à l'élément 3");
}
return i;
})
.checkpoint("Point de contrôle avant l'erreur");
.subscribe(System.out::println, Throwable::printStackTrace); flux
1
2
java.lang.RuntimeException: Erreur à l'élément 3
at REPL.$JShell$66K.lambda$do_it$$0($JShell$66K.java:57)
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Error has been observed at the following site(s):
*__checkpoint ⇢ Point de contrôle avant l'erreur
Original Stack Trace:
at REPL.$JShell$66K.lambda$do_it$$0($JShell$66K.java:57)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:131)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
at reactor.core.publisher.Flux.subscribe(Flux.java:8333)
at REPL.$JShell$124.do_it$($JShell$124.java:57)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at io.github.spencerpark.ijava.execution.IJavaExecutionControl.lambda$execute$1(IJavaExecutionControl.java:95)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
reactor.core.publisher.LambdaSubscriber@70550ed8
Points Clés à Retenir
- Évolution des Paradigmes
- Du synchrone à l’asynchrone
- Des threads aux flux réactifs
- Performance et scalabilité
- Approche Moderne
- Non-blocking par défaut
- Gestion des ressources optimisée
- Tests et monitoring intégrés
- Bonnes Pratiques
- Gestion des erreurs systématique
- Contrôle du backpressure
- Découpage en flux atomiques
- Monitoring continu