Extraire les données d’une base de données relationnelle avec Spark
Me voila en train de tester Apache Spark.
Pour info, Spark est un projet libre du consortium Apache qui a le vent en poupe depuis quelques temps. Il permet entre autres de réaliser des opérations de type Map Reduce en mémoire
Je souhaite automatiser le chargement de données dans ELASTIC SEARCH et faire au passage un petit map reduce sur les données en entrée.
Voici ce que j’ai fait pour extraire les données d’une base relationnelle ORACLE via JDBC. Bien évidemment, ça fonctionne avec toutes les autres SGBDR accessibles via JDBC.
Configuration de Spark
J’utilise le mode « autonome » de spark, c.-à.d sans cluster hadoop.
Pour l’instant, je n’utilise que les modules streaming et sql.
Au préalable , il faut configurer la variable d’environnement SPARK_CLASSPATH
et ajouter le chemin vers le driver JDBC
Je n’ai pas téléchargé la distribution car j’utilise spark au travers des librairies utilisées dans mon programme.
Développement
J’ai décidé d’utiliser le langage SCALA pour développer mon module. Pourquoi SCALA me direz vous ? Parce que ça fait quelques années que je me dis que je dois m’y mettre ( peut être est-ce trop tard ….) et que selon plusieurs commentaires que j’ai pu avoir au DEVOXX, SCALA est clairement plus adapté aux traitements réalisés par SPARK
Dépendances
Voici la configuration de mon pom.xml build.sbt :
scalacOptions ++= Seq("-feature", "-language:postfixOps","-language:existentials") libraryDependencies <<= scalaVersion { scala_version => val sparkVersion = "1.5.1" Seq( "org.scalatest" %% "scalatest" % "2.2.5" % "test", "org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "test", "junit" % "junit" % "4.11" % "test", "org.apache.spark" %% "spark-streaming" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-core" % sparkVersion, "com.typesafe" % "config" % "1.3.0", "com.oracle.weblogic" % "ojdbc7" % "12.1.3-0-0" ) }
Pour l’exemple, voici une classe « fourre tout » qui montre le chargement de Spark ainsi que l’extraction des données
/** * Classe démontrant l'extration des données d'une base de données sous la forme d'un dataframe */ object RDDTest { def main(args: Array[String]) { // configuration val sparkconf = new SparkConf().setAppName("dao").setMaster("local[2]") // chargement du contexte val sparkContext = SparkContext.getOrCreate(sparkconf) /* enregistrement du dialecte oracle pour pallier a un bug sur les NUMERIC */ JdbcDialects.registerDialect(new OracleDialect()) // chargement du contexte SPARK SQL val sqlcontext = new SQLContext(sparkContext) // lecture des données val data = sqlcontext.read.format("jdbc") .options(Map("url" -> "jdbc:oracle:thin:@host:1521/schema", "dbtable" -> "(SELECT col1,col2 FROM table)", "user" -> "user", "password" -> "pwd")).load() println("data.collectAsList()=" + data.collectAsList()) } }
Problèmes rencontrés
Gestion des champs numériques
Comme évoqué ci-dessus, j’ai eu un bug sur la gestion des NUMERIC
,
IllegalArgumentException: requirement failed: Overflowed precision
Comme moyen de contournement, j’ai (après quelques recherches sur le net) opté pour surcharger le dialecte JDBC:
class OracleDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.NUMERIC && typeName == "NUMBER" && size == 0) Some(LongType) else None } }
et enregistré le dialecte à l’exécution
JdbcDialects.registerDialect(new OracleDialect())
Présence du binaire winutils.exe sur windows 7 64bits
Si comme moi vous avez le bug référencé sur le JIRA du projet, il suffit de télécharger le binaire winutils.exe, puis de positionner la variable d’environnement au démarrage en mettant le bon chemin ( attention, il faut un sous répertoire « bin’ .
System.setProperty("hadoop.home.dir", "d:\\\\winutil\\\\")
And now, something completely different
Outre les difficultés liées à l’apprentissage simultané de plusieurs technologies, on y arrive. Le projet est bien documenté.
Maintenant, je vais créer plusieurs indicateurs et faire un chargement dans Elasticsearch. Je décrirai ça dans un prochain article