Utilisation de la plateforme CONFLUENT pour faire du CDC
Après une pause dans mon exploration de KAFKA et KAFKA CONNECT, je me suis remis à faire du CDC.
Cette fois j’ai décidé d’utiliser les possibilités natives de la plateforme CONFLUENT. A savoir le proxy REST et les plugins JDBC et ELASTICSEARCH.
J’ai donc mis en quelques coups de cuillères à pot :
- Une extraction de données incrémentale à partir d’une base de données
- Un chargement dans Elasticsearch
Création des connecteurs
Création du connecteur d’extraction
La plateforme CONFLUENT offre un ensemble d’APIS qui permettent la configuration des connecteurs.
Ex. :
Lecture d’un état de connecteur
curl -XGET http://127.0.0.1:8083/connectors?utm_source=rss&utm_medium=rss
Voila comment j’ai chargé la configuration :
J’ai d’abord crée un fichier JSON
{
"name": "jdbc-sql",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": 2,
"connection.url": "jdbc:oracle:thin:URL",
"mode": "incremental",
"timestamp.column.name": "JOBSTART",
"query":"select * from LOG",
"topic.prefix": "jdbc-avro-jdbc",
"table.types" : "VIEW"
}
}
puis je l’ai chargé via l’API en utilisant une requête POST
Création du connecteur déversant dans Elasticsearch
Même principe avec ce fichier JSON
{
"name": "jdbc-elk-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 2,
"connection.url": "http://host:9200",?utm_source=rss&utm_medium=rss
"type.name": "monitor",
"topics": "jdbc-avro-jdbc",
"topic.key.ignore": "jdbc-avro-jdbc",
"topic.index.map":"jdbc-avro-jdbc:monitor",
"key.ignore": "true"
}
}
Avec cette configuration j’ai maintenant un cluster ELASTICSEARCH qui récupère au fil de l’eau mes données provenant de la base de données.