Exemples de scripts Groovy pour produire et consommer des messages avec Apache Kafka

Si Apache Kafka permet d'exécuter des producteurs et des consommateurs de messages vers ou depuis des topics en mode console, il peut aussi être utile et pratique d'écrire ses propres producteurs et consommateurs, sous la forme de scripts Groovy.
Les exemples de scripts proposés ici tirent parti du système Grape de Groovy, pour télécharger automatiquement, si besoin, les librairies dépendantes ; on a ainsi des scripts autonomes s'exécutant grâce à la commande groovy.

Pour exécuter ces scripts, vous devez donc avoir Groovy installé sur votre machine, ainsi qu'avoir un serveur Apache Kafka 2.0.0 démarré localement (avec ZooKeeper).
Nous n'utiliserons qu'un seul topic, et les messages y seront écrits et lus au format JSON, ce qui permettra de montrer au passage comment définir et utiliser ses propres classes de sérialisation/désérialisation ; en effet, un message Kafka consistant en une paire composée d'une clé (optionnelle) et d'une valeur binaire, il est nécessaire d'opérer des conversions.

Commençons avec le script écrivant des messages dans un topic Kafka.

Script du producteur

Le script Groovy ci-après produit trois messages dans le topic groovy-events, vers le broker Kafka s'exécutant localement, ce qui est défini par la clé bootstrap.servers de la Map props ; notez que l'on y a aussi stipulé par quels types la clé et la valeur du message devaient être sérialisées.
Pour la valeur, on utilise une classe personnalisée, JsonSerializer, définie dans la clé value.serializer ; cette classe est déclarée dans le script, et permet de convertir la valeur d'un message en un tableau d'octets, grâce à la classe Groovy groovy.json.JsonOutput (méthode serialize de la classe JsonSerializer).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Grab('org.apache.kafka:kafka-clients:2.0.0')
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.common.serialization.StringSerializer

import groovy.json.JsonOutput

class JsonSerializer implements Serializer {
    @Override
    void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    byte[] serialize(String topic, data) {
        JsonOutput.toJson(data).getBytes('UTF-8')
    }

    @Override
    void close() {
    }
}

def props = [
    'bootstrap.servers': 'localhost:9092',
    'key.serializer': StringSerializer,
    'value.serializer': JsonSerializer
]

def topic = 'groovy-events'

def producer = new KafkaProducer(props)

['Groovy', 'Grails', 'Micronaut'].each {
    def record = new ProducerRecord(topic, [msg: "$it rocks!"])
    def metadata = producer.send(record).get()

    println "Record sent to partition ${metadata.partition()}, with offset ${metadata.offset()}"
}

Si tout se passe bien, après l'envoi de chaque message, le script affiche dans quelle partition du topic le message aura été transmis, et sa position (offset) dans la partition.

Dans l'exemple de script, les valeurs des messages transmis ont une structure de la forme suivante, du fait de la conversion de la map (valeur transmise au constructeur ProducerRecord) en JSON :

1
2
3
{
    "msg": "Groovy rocks!"
}

Si dans notre exemple, les valeurs utilisées sont des objets de type Map, sachez que l'on pourrait également passer des listes d'objets ou des POGOs (Plain-Old Groovy Objects) pour avoir des structures plus complexes.
Pour plus d'informations sur la conversion d'objets Groovy en JSON, vous pouvez vous reporter sur la documentation de Groovy concernant JsonOutput.

Script du consommateur

Tâchons maintenant de souscrire aux messages de notre topic, ce qui nous permettra notamment de consommer les messages apparaissant dans le topic.
Le script présenté ci-dessous est un petit peu plus complexe, car on souscrit au topic groovy-events de manière indéfini jusqu'à ce l'on stoppe l'exécution du script à l'aide de la combinaison de touches CTRL+C.

Dans ce script, on a aussi recours à une map de propriétés (props) pour indiquer le serveur Kafka que l'on cible, ainsi que les types à utiliser pour opérer la désérialisation des messages reçus.
La valeur d'un message sera désérialisée par la classe personnalisée JsonDeserializer implémentée dans le script ; plus précisément par sa méthode deserialize qui, grâce à une instance de la classe Groovy groovy.json.JsonSlurper, permet de transformer le tableau d'octets en un objet Groovy.
Pour plus d'informations sur cette transformation, vous pouvez vous reporter sur la documentation de Groovy concernant JsonSlurper.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Grab('org.apache.kafka:kafka-clients:2.0.0')
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer

import groovy.json.JsonSlurper

class JsonDeserializer implements Deserializer {
    def slurper = new JsonSlurper()

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    def deserialize(String topic, byte[] data) {
        slurper.parse(data)
    }

    @Override
    void close() {
    }
}

def props = [
    'bootstrap.servers': 'localhost:9092',
    'key.deserializer': StringDeserializer,
    'value.deserializer': JsonDeserializer,
    'group.id': 'GroovyScriptConsumer'
]

def topics = ['groovy-events']

// This closure handles a Kafka message
def consumeRecord = { record ->
    println "Consumer Record: (${record.key()}, ${record.value()}, ${record.partition()}, ${record.offset()})"
}

def consumerThread = new Thread() {
    def terminated = false

    void run() {
        def consumer = new KafkaConsumer(props)
        consumer.subscribe(topics)

        while (!terminated) {
            def records = consumer.poll(1_000)
            records.each consumeRecord
            consumer.commitAsync()
        }

        consumer.close()
        println 'Consumer ended'
    }
}

consumerThread.start()

addShutdownHook {
    consumerThread.with {
        terminated = true
        join()
    }  
}

Il y a deux remarques à soulever :

  • Le script est écrit de tel sorte que lorsque l'on détecte un arrêt par la combinaison de touches CTRL+C, la closure passée à addShutdownHook s'exécute afin de permettre un arrêt gracieux du consommateur ; ce dernier procédant à la récupération des messages Kafka dans son propre thread avec l'appel à la méthode poll().
  • Il est possible, comme cela est sous-entendu par le nom de variable topics (et son type implicite), qu'un consommateur (représenté par la classe KafkaConsumer) souscrive à plusieurs topics en même temps.

Dans ce script, pour plus de clarté, le traitement d'un message Kafka reçu a été séparé dans la closure consumeRecord. On y affiche simplement la clé, la valeur, la partition et l'offset du message.
Libre à vous donc, d'écrire dans le corps de la closure, le traitement qui vous conviendra.

Scripts en action

Vous pouvez voir ces deux exemples de script en action en respectant ces étapes :

  1. S'assurer que ZooKeeper et Kafka soient démarrés localement.
  2. Dans une invite de commandes, exécuter le script du consommateur qui sera alors à l'écoute des messages Kafka du topic groovy-events.
  3. Dans une autre invite de commandes, exécuter une ou plusieurs fois le script du producteur, qui publiera alors des messages dans le topic groovy-events.
  4. Observer alors les messages affichés dans les consoles.

Pour conclure, j'espère que ces exemples de script vous seront utiles, soit pour l'exploration d'Apache Kafka, soit pour en écrire de nouveaux selon vos propres besoins.