Mettre en place KAFKA CONNECT et KAFKA pour faire du change data capture (CDC)
Je suis en train de monter un moteur d’analyse permettant d’analyser les logs d’un ESB.
L’outil standard stockes les évènements dans une base de données.
Afin d’être le moins intrusif possible dans le code des médiations, j’ai choisi de mettre en place un mécanisme de change data capture (CDC) avec KAFKA CONNECT et KAFKA. Pour ce faire j’ai déployé la solution confluent. Celle-ci permet entre autres d’avoir des connecteurs JDBC permettant la connexion aux bases de données relationnelles.
KAFKA est un système de messagerie distribué crée initialement par LinkedIn. Kafka connect fournit une interface en entrée et en sortie de ce dernier.
Si vous voulez plus d’informations, vous pouvez aller du coté de la présentation du devox 2016
Installation
J’ai utilisé la procédure décrite ici
Démarrage des différents démons
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties ./bin/kafka-server-start ./etc/kafka/server.properties ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties ./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
Configuration du connecteur JDBC pour KAFKA CONNECT
Il faut tout d’abord copier le driver jdbc dans le répertoire share/java/kafka-connect-jdbc/
Ensuite, il faut créer un fichier de configuration (ex. esb-logs.properties )
name=esb-log-source connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url=URL_JDBC table.whitelist=ma_table mode=timestamp timestamp.column.name=jobstart topic.prefix=prefix
Les contraintes de KAFKA CONNECT sont les suivantes
- Si on veut faire des extractions incrémentales ( ce qui est mon cas) , il faut soit un élément de type TIMESTAMP qui ne puisse pas être NULL, soit une PK auto-incrémentée.
- Les requêtes doivent être relativement assez simples. Personnellement, je préfère utiliser des vues qui me retournent l’exécution des requêtes SQL.
Exécution
Lancer la commande
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/esb-logs.properties
Pour vérifier que tout est bien dans le topic KAFKA
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic prefix- --from-beginning
Conclusion
Pour l’instant, j’ai réussi à extraire des données de ma base de données vers un broker KAFKA. La suite me permettra de les traiter en mode streaming dans SPARK.