Extraire les données d’une base de données relationnelle avec Spark

Me voila en train de tester Apache Spark.AAEAAQAAAAAAAAImAAAAJDcyMTQ0N2JkLWRjYzMtNDZjMy05OWQ4LTljNzFiM2M0NTg0Mw

Pour info, Spark est un projet libre du consortium Apache qui a le vent en poupe depuis quelques temps. Il permet entre autres de réaliser des opérations de type Map Reduce en mémoire

Je souhaite automatiser le chargement de données dans ELASTIC SEARCH et faire au passage un petit map reduce sur les données en entrée.

Voici ce que j’ai fait pour extraire les données d’une base relationnelle ORACLE via JDBC. Bien évidemment, ça fonctionne avec toutes les autres SGBDR accessibles via JDBC.

Configuration de Spark

J’utilise le mode « autonome » de spark, c.-à.d sans cluster hadoop.

Pour l’instant, je n’utilise que les modules streaming et sql.

Au préalable , il faut configurer la variable d’environnement SPARK_CLASSPATH et ajouter le chemin vers le driver JDBC

Je n’ai pas téléchargé la distribution car j’utilise spark au travers des librairies utilisées dans mon programme.

Développement

J’ai décidé d’utiliser le langage SCALA pour développer mon module. Pourquoi SCALA me direz vous ? Parce que ça fait quelques années que je me dis que je dois m’y mettre ( peut être est-ce trop tard ….) et que selon plusieurs commentaires que j’ai pu avoir au DEVOXX, SCALA est clairement plus adapté aux traitements réalisés par SPARK

Dépendances

Voici la configuration de mon pom.xml build.sbt :

scalacOptions ++= Seq("-feature", "-language:postfixOps","-language:existentials")

libraryDependencies <<= scalaVersion { scala_version =>
  val sparkVersion = "1.5.1"
  Seq(
    "org.scalatest" %% "scalatest" % "2.2.5" % "test",
    "org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "test",
    "junit" % "junit" % "4.11" % "test",
    "org.apache.spark" %% "spark-streaming" % sparkVersion,
    "org.apache.spark" %% "spark-sql" % sparkVersion,
    "org.apache.spark" %% "spark-core" % sparkVersion,
    "com.typesafe" % "config" % "1.3.0",
    "com.oracle.weblogic" % "ojdbc7" % "12.1.3-0-0"
  )
}

Pour l’exemple, voici une classe « fourre tout » qui montre le chargement de Spark ainsi que l’extraction des données

/**
 * Classe démontrant l'extration des données d'une base de données sous la forme d'un dataframe
 */
object RDDTest {

  def main(args: Array[String]) {
    // configuration
    val sparkconf = new SparkConf().setAppName("dao").setMaster("local[2]")
    // chargement du contexte
    val sparkContext = SparkContext.getOrCreate(sparkconf)
    /*
    enregistrement du dialecte oracle pour pallier a un bug sur les NUMERIC
         */

    JdbcDialects.registerDialect(new OracleDialect())
    // chargement du contexte SPARK SQL
    val sqlcontext = new SQLContext(sparkContext)


    // lecture des données
    val data = sqlcontext.read.format("jdbc")
           .options(Map("url" -> "jdbc:oracle:thin:@host:1521/schema",
      "dbtable" -> "(SELECT col1,col2 FROM table)",
      "user" -> "user",
      "password" -> "pwd")).load()

    println("data.collectAsList()=" + data.collectAsList())

  }
}

Problèmes rencontrés

Gestion des champs numériques

Comme évoqué ci-dessus, j’ai eu un bug sur la gestion des NUMERIC ,

IllegalArgumentException: requirement failed: Overflowed precision

Comme moyen de contournement, j’ai (après quelques recherches sur le net) opté pour surcharger le dialecte JDBC:

class OracleDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")

  override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    if (sqlType == Types.NUMERIC && typeName == "NUMBER" && size == 0) Some(LongType) else None
  }
}

et enregistré le dialecte à l’exécution

JdbcDialects.registerDialect(new OracleDialect())

Présence du binaire winutils.exe sur windows 7 64bits

Si comme moi vous avez le bug référencé sur le JIRA du projet, il suffit de télécharger le binaire winutils.exe, puis de positionner la variable d’environnement au démarrage en mettant le bon chemin ( attention, il faut un sous répertoire « bin’ .

System.setProperty("hadoop.home.dir", "d:\\\\winutil\\\\")

And now, something completely different

Outre les difficultés liées à l’apprentissage simultané de plusieurs technologies, on y arrive. Le projet est bien documenté.

Maintenant, je vais créer plusieurs indicateurs et faire un chargement dans Elasticsearch. Je décrirai ça dans un prochain article

 

 

Vus : 719
Publié par Littlewing : 368