Utilisation de spark en mode streaming
Après avoir intégré toutes les briques dans docker, il ne me « restait » plus qu’à coder la brique permettant de me connecter à une queue MQTT et d’insérer des données dans Elasticsearch.
J’ai décidé d’utiliser Apache Spark. Ce composant remplace Hadoop pour les traitements map reduce et permet d’exécuter ce genre de traitement dans plusieurs typologies d’environnement ( batch, cluster spark, cluster hadoop,…).
J’aime bien ce framework car il offre différentes possibilités et permet via un simple batch ( commande java -jar ) de lancer des traitements map reduce performants avec différents langages (scala, java,python, R).
Pour ce faire j’ai aussi décidé d’utiliser le langage SCALA. Spark est développé sur ce langage et fournit plus de fonctionnalités en scala que sur les autres.
Connexion à une queue MQTT
class MQTTStreaming(val sc: StreamingContext) { val conf = ConfigFactory.load() val logger = Logger.getLogger(this.getClass) def extractFromMQTT(): DStream[Log] = { val receiver = MQTTUtils.createStream(sc, conf.getString(ConfigurationKeys.MQTT_URL.toString), conf.getString(ConfigurationKeys.MQTT_TOPIC.toString)) return receiver.map(r => (new Log(UUID.randomUUID().toString, r, Date.from(Instant.now())))) } }
Sauvegarde dans Elasticsearch
Elasticsearch fournit une API permettant à des dérivés d’Hadoop de se connecter à un cluster Elasticsearch. L’utilisation est des plus simples.
def saveToES(array: Array[Log]): Unit = { EsSpark.saveToEs(sparkContext.makeRDD(array), configuration.getString(ConfigurationKeys.ES_INDEX.toString), getOptions()) }
Le programme
Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) val sparkContext = SparkContext.getOrCreate(new SparkUtils().getSparkConf()) val streamingContext = new StreamingContext(sparkContext, Duration.apply(10000)) val mqttStreaming = new MQTTStreaming(streamingContext) val stream = mqttStreaming.extractFromMQTT() val esloader = new ESLoader(sparkContext) stream.foreachRDD(rdd => { val collect = rdd.collect() esloader.saveToES(collect) }) streamingContext.start() streamingContext.awaitTermination() }
J’ai volontairement crée plus de variables qu’il ne faut car j’ai pas mal mis de logs
Je vais essayer de mettre mon code sur github prochainement.