Créer un connecteur KAFKA CONNECT
Après avoir essayé le connecteur de confluent kafka-connect-jdbc, je me suis aperçu qu’il y avait encore quelques bugs qui le rendaient inutilisable dans mon environnement ( notamment avec les champs numériques).
J’ai donc décidé de créer mon propre connecteur. Certes celui-ci sera moins générique mais il correspondra à mon besoin ( ou pas …).
API utilisées
- JAVA8
- API KAFKA CONNECT
Configuration maven
Cette configuration me permet de créer un fat jar avec les dépendances nécessaires à la bonne exécution du connecteur.
Voici mon fichier pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"?utm_source=rss&utm_medium=rss xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?utm_source=rss&utm_medium=rss xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?utm_source=rss&utm_medium=rss http://maven.apache.org/xsd/maven-4.0.0.xsd">?utm_source=rss&utm_medium=rss <modelVersion>4.0.0</modelVersion> <groupId>info.touret.myconnect</groupId> <artifactId>myconnect</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>myconnect</name> <url>http://maven.apache.org</url>?utm_source=rss&utm_medium=rss <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <kafka.version>0.10.0.0</kafka.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>LATEST</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>LATEST</version> <scope>test</scope> </dependency> <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-api-mockito</artifactId> <version>1.6.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.powermock</groupId> <artifactId>powermock-module-junit4</artifactId> <version>1.6.5</version> <scope>test</scope> </dependency> <dependency> <groupId>com.oracle.weblogic</groupId> <artifactId>ojdbc7</artifactId> <version>12.1.3-0-0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-runtime</artifactId> <version>${kafka.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${kafka.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>allinone</shadedClassifierName> <artifactSet> <includes> <include>*:*</include> </includes> </artifactSet> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class></Main-Class> </manifestEntries> </transformer> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
Développement
Comme le dit le guide de développement, il faut créer a minima une classe héritant de la classe SourceConnector et une classe héritant de SourceTask.
MySourceConnector
Cette classe permet la récupération de la configuration du connecteur.
public class MySourceConnector extends SourceConnector { public static final String TOPIC_CONFIG = "topic"; private static Logger logger = LoggerFactory.getLogger(MySourceConnector.class); public String version() { return AppInfoParser.getVersion(); } public void start(Map<String, String> map) { } public Class<? extends Task> taskClass() { return MySourceTask.class; } public List<Map<String, String>> taskConfigs(int maxTasks) { ArrayList<Map<String, String>> configs = new ArrayList<>(); Map<String, String> map = new HashMap<>(); map.put(TOPIC_CONFIG, "montopic"); configs.add(map); return configs; } public void stop() { } public ConfigDef config() { return new ConfigDef().define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Kafka Topic"); } }
MySourceTask
Cette classe gère l’exécution de l’extraction et chargement dans KAFKA. Elle permet dans la méthode start() de démarrer le connecteur et de lancer les ressources (connexions JDBC).
Comme le connecteur standard, je m’appuie sur un champ de type Timestamp. celui -ci me permet de créer un offset et de faire un parcours incrémental de mes résultats .
public class MySourceTask extends SourceTask { private String topic = null; private Logger logger = LoggerFactory.getLogger(MySourceTask.class); private Connection connection; private PreparedStatement preparedStatement; public void start(Map<String, String> props) { topic = props.get(MySourceConnector.TOPIC_CONFIG); logger.warn("Starting Task "); try { connection = createConnection(); // connexion jdbc classique preparedStatement = connection.prepareStatement("select *from mytable"); } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } public List<SourceRecord> poll() throws InterruptedException { logger.warn("Polling"); List<SourceRecord> results = new LinkedList<>(); try { // recuperation de la partition Map sourcePartition = Collections.singletonMap("table", "mytable")); // recuperation du dernier offset final Timestamp lastRecordedOffset = getLastRecordedOffset(sourcePartition); preparedStatement.setTimestamp(1,lastRecordedOffset); ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { // recuperation du champ de type timestamp permettant le chargement incremental Timestamp timestamp = resultSet.getTimestamp(18); // extraction de la ligne (oui je sais c'est bourrin) String line = new StringBuilder().append(resultSet.getString(1)).append(resultSet.getString(2)).append(timestamp).toString(); je cree un nouveau offset Map sourceOffSet = Collections.singletonMap("position", new Long(timestamp.getTime())); results.add(new SourceRecord(sourcePartition, sourceOffSet, topic, Schema.STRING_SCHEMA, line)); } } catch (SQLException e) { throw new InterruptedException(e.getMessage()); } return results; } /** * extraction du timestamp du dernier champ extrait. Par defaut, je positionne l'EPOCH */ private Timestamp getLastRecordedOffset(Map<String,Object> partition) { Map<String,Object> offset = context.offsetStorageReader().offset(partition); Timestamp lastRecordedOffset = Timestamp.from(EPOCH); if(offset !=null){ lastRecordedOffset = new Timestamp((Long)offset.getOrDefault("position",Timestamp.from(EPOCH))); } return lastRecordedOffset; } public void stop() { try { if (preparedStatement != null) { preparedStatement.close(); } if (preparedStatement != null) { connection.close(); } } catch (SQLException e) { logger.error(e.getMessage(), e); } } public String version() { return new MySourceConnector().version(); } }
Configuration nécessaire à l’exécution du plugin
Il faudra créer également un fichier properties contenant les informations suivantes :
name=my-connector-source connector.class=info.touret.MySourceConnector
Exécution
De la même manière que pour le connecteur standard…
Conclusion
Voila le squelette de mon connecteur crée. Pour l’instant les données sont sérialisées de manière un peu brutale. La prochaine étape sera de les mettre au format JSON. La suite dans un prochain numéro…