Mettre en place KAFKA CONNECT et KAFKA pour faire du change data capture (CDC)

Je suis en train de monter un moteur d’analyse permettant d’analyser les logs d’un ESB.

L’outil standard stockes les évènements dans une base de données.

Afin d’être le moins intrusif possible dans le code des médiations, j’ai choisi de mettre en place un mécanisme de change data capture (CDC) avec KAFKA CONNECT et KAFKA. Pour ce faire j’ai déployé la solution confluent. Celle-ci permet entre autres d’avoir des connecteurs JDBC permettant la connexion aux bases de données relationnelles.

KAFKA est un système de messagerie distribué crée initialement par LinkedIn. Kafka connect fournit une interface en entrée et en sortie de ce dernier.

Si vous voulez plus d’informations, vous pouvez aller du coté de la présentation du devox 2016

Installation

J’ai utilisé la procédure décrite ici

Démarrage des différents démons

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties 
./bin/kafka-server-start ./etc/kafka/server.properties 
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties 
./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

Configuration du connecteur JDBC pour KAFKA CONNECT

Il faut tout d’abord copier le driver jdbc dans le répertoire share/java/kafka-connect-jdbc/

Ensuite, il faut créer un fichier de configuration (ex. esb-logs.properties )

name=esb-log-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=URL_JDBC
table.whitelist=ma_table
mode=timestamp
timestamp.column.name=jobstart
topic.prefix=prefix

Les contraintes de KAFKA CONNECT sont les suivantes

  • Si on veut faire des extractions incrémentales ( ce qui est mon cas) , il faut soit un élément de type TIMESTAMP qui ne puisse pas être NULL, soit une PK auto-incrémentée.
  • Les requêtes doivent être relativement assez simples. Personnellement, je préfère utiliser des vues qui me retournent l’exécution des requêtes SQL.

Exécution

Lancer la commande

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/esb-logs.properties

Pour vérifier que tout est bien dans le topic KAFKA

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic prefix- --from-beginning

Conclusion

Pour l’instant, j’ai réussi à extraire des données de ma base de données vers un broker KAFKA. La suite me permettra de les traiter en mode streaming dans SPARK.

Vus : 1756
Publié par Littlewing : 368