Exemples de scripts Groovy pour produire et consommer des messages avec Apache Kafka
Si Apache Kafka permet d'exécuter des producteurs et des consommateurs de messages vers ou depuis des topics en mode console, il peut aussi être utile et pratique d'écrire ses propres producteurs et consommateurs, sous la forme de scripts Groovy.
Les exemples de scripts proposés ici tirent parti du système Grape de Groovy, pour télécharger automatiquement, si besoin, les librairies dépendantes ; on a ainsi des scripts autonomes s'exécutant grâce à la commande groovy
.
Pour exécuter ces scripts, vous devez donc avoir Groovy installé sur votre machine, ainsi qu'avoir un serveur Apache Kafka 2.0.0 démarré localement (avec ZooKeeper).
Nous n'utiliserons qu'un seul topic, et les messages y seront écrits et lus au format JSON, ce qui permettra de montrer au passage comment définir et utiliser ses propres classes de sérialisation/désérialisation ; en effet, un message Kafka consistant en une paire composée d'une clé (optionnelle) et d'une valeur binaire, il est nécessaire d'opérer des conversions.
Commençons avec le script écrivant des messages dans un topic Kafka.
Script du producteur
Le script Groovy ci-après produit trois messages dans le topic groovy-events
, vers le broker Kafka s'exécutant localement, ce qui est défini par la clé bootstrap.servers
de la Map props
; notez que l'on y a aussi stipulé par quels types la clé et la valeur du message devaient être sérialisées.
Pour la valeur, on utilise une classe personnalisée, JsonSerializer
, définie dans la clé value.serializer
; cette classe est déclarée dans le script, et permet de convertir la valeur d'un message en un tableau d'octets, grâce à la classe Groovy groovy.json.JsonOutput
(méthode serialize
de la classe JsonSerializer
).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
|
Si tout se passe bien, après l'envoi de chaque message, le script affiche dans quelle partition du topic le message aura été transmis, et sa position (offset) dans la partition.
Dans l'exemple de script, les valeurs des messages transmis ont une structure de la forme suivante, du fait de la conversion de la map (valeur transmise au constructeur ProducerRecord
) en JSON :
1 2 3 |
|
Si dans notre exemple, les valeurs utilisées sont des objets de type Map
, sachez que l'on pourrait également passer des listes d'objets ou des POGOs (Plain-Old Groovy Objects) pour avoir des structures plus complexes.
Pour plus d'informations sur la conversion d'objets Groovy en JSON, vous pouvez vous reporter sur la documentation de Groovy concernant JsonOutput.
Script du consommateur
Tâchons maintenant de souscrire aux messages de notre topic, ce qui nous permettra notamment de consommer les messages apparaissant dans le topic.
Le script présenté ci-dessous est un petit peu plus complexe, car on souscrit au topic groovy-events
de manière indéfini jusqu'à ce l'on stoppe l'exécution du script à l'aide de la combinaison de touches CTRL+C
.
Dans ce script, on a aussi recours à une map de propriétés (props
) pour indiquer le serveur Kafka que l'on cible, ainsi que les types à utiliser pour opérer la désérialisation des messages reçus.
La valeur d'un message sera désérialisée par la classe personnalisée JsonDeserializer
implémentée dans le script ; plus précisément par sa méthode deserialize
qui, grâce à une instance de la classe Groovy groovy.json.JsonSlurper
, permet de transformer le tableau d'octets en un objet Groovy.
Pour plus d'informations sur cette transformation, vous pouvez vous reporter sur la documentation de Groovy concernant JsonSlurper.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
|
Il y a deux remarques à soulever :
- Le script est écrit de tel sorte que lorsque l'on détecte un arrêt par la combinaison de touches
CTRL+C
, la closure passée àaddShutdownHook
s'exécute afin de permettre un arrêt gracieux du consommateur ; ce dernier procédant à la récupération des messages Kafka dans son propre thread avec l'appel à la méthodepoll()
. - Il est possible, comme cela est sous-entendu par le nom de variable
topics
(et son type implicite), qu'un consommateur (représenté par la classeKafkaConsumer
) souscrive à plusieurs topics en même temps.
Dans ce script, pour plus de clarté, le traitement d'un message Kafka reçu a été séparé dans la closure consumeRecord
. On y affiche simplement la clé, la valeur, la partition et l'offset du message.
Libre à vous donc, d'écrire dans le corps de la closure, le traitement qui vous conviendra.
Scripts en action
Vous pouvez voir ces deux exemples de script en action en respectant ces étapes :
- S'assurer que ZooKeeper et Kafka soient démarrés localement.
- Dans une invite de commandes, exécuter le script du consommateur qui sera alors à l'écoute des messages Kafka du topic
groovy-events
. - Dans une autre invite de commandes, exécuter une ou plusieurs fois le script du producteur, qui publiera alors des messages dans le topic
groovy-events
. - Observer alors les messages affichés dans les consoles.
Pour conclure, j'espère que ces exemples de script vous seront utiles, soit pour l'exploration d'Apache Kafka, soit pour en écrire de nouveaux selon vos propres besoins.