Example de microservice MicroProfile Reactive Messaging avec Apache Camel
Disponible depuis juillet dernier en version 1.0, l'API MicroProfile Reactive Messaging permet de développer des applications de flux de données réactifs, des microservices pilotés par des événements, et de mettre en oeuvre des modèles de type event sourcing (cf. MicroProfile Reactive Messaging 1.0 is now available), tout en utilisant un modèle de développement CDI.
Cette spécification est autonome et ne fait pas encore partie de la spécification parapluie MicroProfile.
Parmi les deux implémentations disponibles de cette spécification figure le projet SmallRye Reactive Messaging qui, pour reprendre la page du projet, fournit un support pour, entre autres, Apache Kafka, AMQP 1.0, MQTT et Apache Camel.
Dans la spécification Reactive Messaging, ils sont intégrés sous la forme de connecteurs assurant le transport de messages, par configuration de sources de données et/ou de destinataires de données.
Par exemple, on pourra tirer parti du connecteur Apache Kafka afin de permettre une communication asynchrone entre plusieurs microservices, ou encore recourir au connecteur Apache Camel pour disposer de la puissance de ce dernier en matière d'intégration.
Microservice microprofile-reactive
D'ailleurs, c'est le connecteur Apache Camel qui est mis en oeuvre dans le projet d'exemple microprofile-reactive que je partage sur Bitbucket, et dont il est question par la suite : il s'agit d'un microservice MicroProfile développé en langage Kotlin que l'on peut construire avec Gradle, et utilisant bien sûr Reactive Messaging.
Dans ce projet, on cherche simplement à détecter l'arrivée de nouveaux fichiers dans un répertoire spécifique, et effectuer un traitement particulier pour chaque ligne du fichier que l'on traite.
Et l'on passe par l'utilisation du composant Camel File pour la détection de fichiers.
Au niveau des dépendances du projet, en plus de celles à MicroProfile et SmallRye Reactive Messaging, dont notamment à smallrye-reactive-messaging-camel-1.0
pour son support d'Apache Camel, il a été nécessaire d'ajouter une dépendance à camel-cdi
(voir fichier build.gradle.kts
).
On a employé deux approches différentes, correspondant dans le projet à deux classes distinctes qui sont des beans CDI de protée application :
- La classe
com.odelia.microprofile.reactive.CamelReactive
qui s'appuie sur le service injecté dans un attribut de la classe de typeorg.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
pour définir la route Camel servant à détecter la présence de fichiers dans le répertoire/temp/camel
. - La classe
com.odelia.microprofile.reactive.CamelReactiveWithConfig
, plus simple, qui effectue le même travail, mais qui se repose sur le fichier de configurationmicroprofile-config.properties
pour la définition de la route Camel, avec une surveillance du répertoire/temp/camel1
différent du précédent.
Voyons comment s'articule le traitement d'un fichier et de son contenu dans chaque approche.
Classe CamelReactive
La classe CamelReactive
du projet d'exemple est définie ainsi :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
Placées sur les méthodes de la classe, les annotations @Outgoing
et @Incomning
du projet Reactive Messaging, permettent d'indiquer respectivement, vers quel canal les données doivent être envoyées et de quel canal proviennent les données.
Un canal est un nom logique pouvant être mappé, par exemple, sur une queue distante, un topic, une adresse AMQP, ou encore une route Camel.
On peut ainsi former une chaîne de traitement, comme c'est le cas dans cette classe : dans la méthode source
, on utilise le service injecté de type CamelReactiveStreamsService
pour construire une route Camel - basée sur le composant File
pour surveiller le répertoire /temp/camel
-, et renvoyer un objet de type réactif, Publisher<Exchange>
, Exchange
étant un type de Camel.
Du fait de la présence de l'annotation @Outgoing("camel")
sur cette méthode, et de l'annotation @Incoming("camel")
sur la méthode consume
, le flux de données camel
consiste en la production de données (des objets Exchange
) par la méthode source
, et en leur consommation par la méthode consume
.
La méthode consume
obtient l'objet File
permettant de manipuler le fichier qui a été détecté par le composant Camel File
, puis à partir de la collection constituée des lignes du fichier, crée et retourne le type réactif Publisher<String>
.
Le traitement d'une ligne de fichier est séparé et réalisé dans la méthode handleText
: celle-ci consomme les chaînes de caractères provenant du canal text
(annotation @Incoming("text")
), ces dernières étant publiées grâce à la méthode consume
(annotation @Outgoing("text")
).
Dans notre exemple, handleText
se contente simplement de tracer la chaîne de caractères reçue.
Une méthode annotée avec @Incoming
et @Outgoing
, comme c'est le cas pour la méthode consume
, est appelée un processeur ou un médiateur.
Classe CamelReactiveWithConfig
Dans le code précédent, on a codé en dur l'URI utilisée pour la route Camel, et l'on pourrait certainement lire ce paramètre depuis une source de configuration MicroProfile Config ; cependant, utilisant également MicroProfile Config, SmallRye Reactive Messaging permet la configuration des canaux en suivant une certaine convention.
Dans l'application d'exemple, on a configuré le canal camel1
dans le fichier de configuration microprofile-config.properties
, de la manière suivante :
1 2 |
|
Essentiellement, on indique que les données entrantes du canal camel1
, lié au connecteur Camel, proviendront de la route d'URI file:///temp/camel1?delay=1000
.
La classe CamelReactiveWithConfig
qui est aussi un bean CDI de portée application, utilise l'annotation @Incoming("camel1")
sur la méthode consume
, et l'on a plus besoin de la méthode source
qui créait la route Camel grâce au service CamelReactiveStreamsService
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Lorsqu'un fichier est détecté dans le réperoire /temp/camel1
, la méthode consume
le reçoit au travers du paramètre genfile
de type GenericFile<File>
, qui est un type du projet Apache Camel ;
la méthode consume
a en effet une signature qui le permet (voir Consuming one items and producing a streams).
Une autre signature autorisée, faisant apparaître le wrapper CamelMessage
(qui implémente l'interface org.eclipse.microprofile.reactive.messaging.Message
), pourrait être celle-ci :
1
|
|
Enfin, l'annotation @Outgoing("text1")
posée sur la méthode consume
permet l'envoi des lignes du fichier traité, vers la méthode handleText
annotée avec @Incoming("text1")
, sur le canal text1
.
SmallRye Reactive Messaging
Pour aller plus loin avec Reactive Messaging et en particulier SmallRye Reactive Messaging, je vous invite à lire le guide en ligne de ce dernier, afin de mieux comprendre les concepts que j'ai abordés dans cet article.
Nul doute qu'avec l'API Reactive Messaging et le support de différents connecteurs, on écrive des applications beaucoup plus claires et élégantes, tout en embrassant le paradigme de la programmation réactive !