Example de microservice MicroProfile Reactive Messaging avec Apache Camel

Disponible depuis juillet dernier en version 1.0, l'API MicroProfile Reactive Messaging permet de développer des applications de flux de données réactifs, des microservices pilotés par des événements, et de mettre en oeuvre des modèles de type event sourcing (cf. MicroProfile Reactive Messaging 1.0 is now available), tout en utilisant un modèle de développement CDI.
Cette spécification est autonome et ne fait pas encore partie de la spécification parapluie MicroProfile.

Parmi les deux implémentations disponibles de cette spécification figure le projet SmallRye Reactive Messaging qui, pour reprendre la page du projet, fournit un support pour, entre autres, Apache Kafka, AMQP 1.0, MQTT et Apache Camel.
Dans la spécification Reactive Messaging, ils sont intégrés sous la forme de connecteurs assurant le transport de messages, par configuration de sources de données et/ou de destinataires de données.
Par exemple, on pourra tirer parti du connecteur Apache Kafka afin de permettre une communication asynchrone entre plusieurs microservices, ou encore recourir au connecteur Apache Camel pour disposer de la puissance de ce dernier en matière d'intégration.

Microservice microprofile-reactive

D'ailleurs, c'est le connecteur Apache Camel qui est mis en oeuvre dans le projet d'exemple microprofile-reactive que je partage sur Bitbucket, et dont il est question par la suite : il s'agit d'un microservice MicroProfile développé en langage Kotlin que l'on peut construire avec Gradle, et utilisant bien sûr Reactive Messaging.

Dans ce projet, on cherche simplement à détecter l'arrivée de nouveaux fichiers dans un répertoire spécifique, et effectuer un traitement particulier pour chaque ligne du fichier que l'on traite.
Et l'on passe par l'utilisation du composant Camel File pour la détection de fichiers.

Au niveau des dépendances du projet, en plus de celles à MicroProfile et SmallRye Reactive Messaging, dont notamment à smallrye-reactive-messaging-camel-1.0 pour son support d'Apache Camel, il a été nécessaire d'ajouter une dépendance à camel-cdi (voir fichier build.gradle.kts).

On a employé deux approches différentes, correspondant dans le projet à deux classes distinctes qui sont des beans CDI de protée application :

  • La classe com.odelia.microprofile.reactive.CamelReactive qui s'appuie sur le service injecté dans un attribut de la classe de type org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService pour définir la route Camel servant à détecter la présence de fichiers dans le répertoire /temp/camel.
  • La classe com.odelia.microprofile.reactive.CamelReactiveWithConfig, plus simple, qui effectue le même travail, mais qui se repose sur le fichier de configuration microprofile-config.properties pour la définition de la route Camel, avec une surveillance du répertoire /temp/camel1 différent du précédent.

Voyons comment s'articule le traitement d'un fichier et de son contenu dans chaque approche.

Classe CamelReactive

La classe CamelReactive du projet d'exemple est définie ainsi :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ApplicationScoped
open class CamelReactive {
    private val logger = LoggerFactory.getLogger(CamelReactive::class.java)

    @Inject
    private lateinit var camelReactive: CamelReactiveStreamsService

    @Outgoing("camel")
    open fun source(): Publisher<Exchange> {
        return camelReactive.from("file:///temp/camel?delay=1000")
    }

    @Incoming("camel")
    @Outgoing("text")
    open fun consume(exchange: Exchange) : Publisher<String> {
        val file = exchange.`in`.getBody(File::class.java)
        return ReactiveStreams.fromIterable(file.readLines()).buildRs()
    }

    @Incoming("text")
    open fun handleText(text: String) : Unit {
        logger.info("Text: {}", text)
    }
}

Placées sur les méthodes de la classe, les annotations @Outgoing et @Incomning du projet Reactive Messaging, permettent d'indiquer respectivement, vers quel canal les données doivent être envoyées et de quel canal proviennent les données.
Un canal est un nom logique pouvant être mappé, par exemple, sur une queue distante, un topic, une adresse AMQP, ou encore une route Camel.

On peut ainsi former une chaîne de traitement, comme c'est le cas dans cette classe : dans la méthode source, on utilise le service injecté de type CamelReactiveStreamsService pour construire une route Camel - basée sur le composant File pour surveiller le répertoire /temp/camel -, et renvoyer un objet de type réactif, Publisher<Exchange>, Exchange étant un type de Camel.
Du fait de la présence de l'annotation @Outgoing("camel") sur cette méthode, et de l'annotation @Incoming("camel") sur la méthode consume, le flux de données camel consiste en la production de données (des objets Exchange) par la méthode source, et en leur consommation par la méthode consume.

La méthode consume obtient l'objet File permettant de manipuler le fichier qui a été détecté par le composant Camel File, puis à partir de la collection constituée des lignes du fichier, crée et retourne le type réactif Publisher<String>.

Le traitement d'une ligne de fichier est séparé et réalisé dans la méthode handleText : celle-ci consomme les chaînes de caractères provenant du canal text (annotation @Incoming("text")), ces dernières étant publiées grâce à la méthode consume (annotation @Outgoing("text")).
Dans notre exemple, handleText se contente simplement de tracer la chaîne de caractères reçue.

Une méthode annotée avec @Incoming et @Outgoing, comme c'est le cas pour la méthode consume, est appelée un processeur ou un médiateur.

Classe CamelReactiveWithConfig

Dans le code précédent, on a codé en dur l'URI utilisée pour la route Camel, et l'on pourrait certainement lire ce paramètre depuis une source de configuration MicroProfile Config ; cependant, utilisant également MicroProfile Config, SmallRye Reactive Messaging permet la configuration des canaux en suivant une certaine convention.

Dans l'application d'exemple, on a configuré le canal camel1 dans le fichier de configuration microprofile-config.properties, de la manière suivante :

1
2
mp.messaging.incoming.camel1.connector=smallrye-camel
mp.messaging.incoming.camel1.endpoint-uri=file:///temp/camel1?delay=1000

Essentiellement, on indique que les données entrantes du canal camel1, lié au connecteur Camel, proviendront de la route d'URI file:///temp/camel1?delay=1000.

La classe CamelReactiveWithConfig qui est aussi un bean CDI de portée application, utilise l'annotation @Incoming("camel1") sur la méthode consume, et l'on a plus besoin de la méthode source qui créait la route Camel grâce au service CamelReactiveStreamsService :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@ApplicationScoped
open class CamelReactiveWithConfig {
    private val logger = LoggerFactory.getLogger(CamelReactiveWithConfig::class.java)

    // Use channel camel1 defined in microprofile-config.properties
    @Incoming("camel1")
    @Outgoing("text1")
    open fun consume(genfile: GenericFile<File>) : Publisher<String> {
        return ReactiveStreams.fromIterable(genfile.file.readLines()).buildRs()
    }

    @Incoming("text1")
    open fun handleText(text: String) : Unit {
        logger.info("Text: {}", text)
    }
}

Lorsqu'un fichier est détecté dans le réperoire /temp/camel1, la méthode consume le reçoit au travers du paramètre genfile de type GenericFile<File>, qui est un type du projet Apache Camel ; la méthode consumea en effet une signature qui le permet (voir Consuming one items and producing a streams).
Une autre signature autorisée, faisant apparaître le wrapper CamelMessage (qui implémente l'interface org.eclipse.microprofile.reactive.messaging.Message), pourrait être celle-ci :

1
open fun consume(genfile: CamelMessage<GenericFile<File>>) : PublisherBuilder<String>

Enfin, l'annotation @Outgoing("text1") posée sur la méthode consume permet l'envoi des lignes du fichier traité, vers la méthode handleText annotée avec @Incoming("text1"), sur le canal text1.

SmallRye Reactive Messaging

Pour aller plus loin avec Reactive Messaging et en particulier SmallRye Reactive Messaging, je vous invite à lire le guide en ligne de ce dernier, afin de mieux comprendre les concepts que j'ai abordés dans cet article.

Nul doute qu'avec l'API Reactive Messaging et le support de différents connecteurs, on écrive des applications beaucoup plus claires et élégantes, tout en embrassant le paradigme de la programmation réactive !