Patrón Type Class y Spark

En las pasadas entradas centradas en el patrón type class con título Patrón Type Class  y Patrón Type Class: definición de leyes y test, realicé una descripción de la estructura de dicho patrón. En la presente entrada, Patrón Type Class y Spark, me centraré en la definición de un API mediante el patrón type class utilizando Apache Spark.

Apache Spark es aquel motor de procesamiento y análisis de datos. Apache Spark es una solución muy utilizada en el ámbito del Big Data.

El problema del ejercicio solucionada con Apache Spark consiste en extraer los datos existentes en unas tablas de una base de datos, crear un fichero en formato parquet de los datos; y, dicho fichero, dejarlo en una estructura de directorios. Todo ello, lo más configurable posible.

La definición de las tablas fuentes de datos son tablas, con nombre y campos con un enfoque didáctico, es decir, son tablas con nombre no relevantes. Definiré dos tablas: la tabla libros, con los siguientes campos: codigo, nombre y población; y, la tabla dat_country, con los mismos campos.

El tipo básico del ejemplo es aquel tipo que puede trabajar con el valor parametrizado cuya definición es la siguiente: type Postgresql[T] = T. El tipo lo he identificado con Postgres porque es con la base de datos con la cual realizaré el ejercicio.

La definición del fichero sbt del ejecercicio es la siguiente:

import sbt.Keys.libraryDependencies
name := "EjemploTypeClassSpark"
version := "1.0"
scalaVersion := "2.11.9"
scalacOptions += "-Ypartial-unification" // 2.11.9+
val spark_version = "2.3.1" 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % spark_version,
  "org.apache.spark" %% "spark-sql" % spark_version,
  "com.typesafe" % "config" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

La definición del API con nombre Engine estará formada por una única función con nombre dataExtraction la cual realizará la extración de datos, creación de un fichero en formato parquet y escritura en un path sin datos de retorno, es decir, de tipo Unit. Así, la definición de la parte funcional del type class es la siguiente:

import java.util.{ Properties}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import conf.Configuration
import typeclass.DTO.DataConfiguration
import util.Util
trait Engine[P[_]] {
  def dataExtraction(): P[Unit]
}
object Engine extends EngineInstances with EngineSyntax
trait EngineSyntax{
  object syntax{
    def |-> [P[_]]()(implicit BDI:Engine[P]): P[Unit] = BDI.dataExtraction()
  }
}

En el snippet anterior, definimos los siguientes elementos: el trait Engine del API, la definición del objeto Engine que hereda de las instancias y se comporta como un elemento como EngineSyntax. En este ejercicio, por simplificar, se omite la definición del trait con la definición del lenguaje y las leyes matemáticas que debe de cumplir.

La definición de las instancias con los efectos de lado es la siguiente:

trait EngineInstances{
  def apply[P[_]](implicit BDI:Engine[P]): Engine[P] = BDI
  import typeclass.Types.Postgresql
  implicit object EnginePostgress extends Engine[Postgresql]{
  def logger = LoggerFactory.getLogger(this.getClass)
  def loadTableField(nombreTable:String): List[String] = {
    nombreTable match {
       case "pruebas.libros" => List("codigo","nombre","poblacion")
       case "pruebas.dat_country" => List("codigo","nombre","poblacion")
       case _ => List()
   }
 }
def loadTableToParquet( spark:SparkSession, dataConfiguration:DataConfiguration ): Unit = {
  val engineSpark = spark
    .read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", dataConfiguration.url)
    .option("dbtable", dataConfiguration.nameTable)
    .option("user", dataConfiguration.user )
    .option("password", dataConfiguration.password)
    .option("fetchsize", "1000")
    .load()
    val parameter: List[String] = loadTableField(dataConfiguration.nameTable) //: _*
    engineSpark.select( parameter.toSeq.head, parameter.tail.toSeq: _* ).write.format("parquet").save( Util.getFilePathTarget(dataConfiguration.nameBucket ,dataConfiguration.nameTable)   )
}
def runSpark(url:String, user:String, password:String, numTables:Int, prop:Properties): Either[String, Unit] = {
  val spark = SparkSession.builder.appName("EjemploTypeClassSpark").getOrCreate()
  for(num <- 1 to numTables){
    try{
      val nameTable = prop.getProperty("table_" + num)
      if(!nameTable.equals(null) && !nameTable.equals("")){
        logger.info(s"[*****] Número tabla: ${num}, nombre tabla: ${nameTable}.")
        loadTableToParquet(spark, DataConfiguration(numTables=num, nameTable=nameTable, url=url, user=user, password=password, nameBucket = prop.getProperty("bucketName")) )
      }else{
        logger.info(s"[*****] Número tabla: table_${num} VACÍA")
      }
   } catch {
     case ex: Exception => {
       logger.info(s"[*****] Error en la carga de la tabla con número table_${num}")
       logger.error(s"Exception: ${ex.getMessage}")
     }
   }
 }
 spark.close()
 Right(Unit)
}
override def dataExtraction(): Postgresql[Unit] = {
  for{
    environment <- Configuration.loadEnvironmentVariables.right
    properties <- Configuration.loadProperties(environment.configuration).right
    _ <- runSpark(environment.url, environment.user , environment.password, properties.numTables, properties.properties ).right
  }yield{
    Right(Unit)
  }
 }
}
[...]
}

En el snippet anterior, definimos un trait EngineInstances con un constructor y un objeto implícito EnginePostgress el cual hereda del API Engine. En este trait, si definimos otro tipo de parámetro, como por ejemplo:Either[String,T] definido como type Oracle[T] = Either[String,T], sería el lugar para su implementación.

El objeto EnginePostgress define la función dataExtraction la cual define el programa con las siguientes sentencias: primero, larga de variables de entorno; segunda, carga de la configuración de los ficheros de properties; tercero, las operaciones con Spark; y, por último, el retorno de un elemento de tipo Unit. Las tres funciones son funciones que retornan un contenedor binario de tipo Either.

La función runSpark es aquella función que realiza la constructuctión de una sesión de Spark para realizar una operación sobre una tabla configurada; dicha función, se realiza en la función loadTableToParquet.

La función loadTableToParquet es aquella función que realiza la carga de la configuración del motor Spark para una tabla de una base de datos determinada, extrae los datos en formato parquet y deja el fichero en una ubicación determinada.

La aplicación cliente del API Engine es la siguiente:

import typeclass.Engine.syntax._
import typeclass.Types.Postgresql
object App extends App{
  val startTime: Long = System.currentTimeMillis()
  |->[Postgresql]()
}

El fichero properties con la configuración con el nombre de las tablas, el número de tablas a extraer y el path con el directorio en donde se almacena el resultado es el siguiente:

bucketName=~/tmp/
num_tables=2
table_1=pruebas.libros
table_2=pruebas.dat_country

La variables de entorno a configurar para la ejecución de la aplicación son las siguientes:

export url=jdbc:postgresql://localhost:5432/prueba
export user=postgres
export password=password
export configuration=~/workspace/EjemploTypeClassSpark/src/main/resources/configuration_bdi.properties

Los componentes software para la carga de las variables de entorno y carga de ficheros properties nos las describo en la entrada para reducir el tamaño y por centrar el ejemplo en el patrón type class.

Para finalizar y poder ejecutar la aplicación en local con Apache Spark se ejecuta con el siguiente comando:

cd ~/scala/spark-2.3.1-bin-hadoop2.7/bin
spark-submit --driver-class-path postgresql-42.1.4.jre6.jar --class "App" --master local[2] ~/workspace/EjemploTypeClassSpark/target/scala-2.11/ejemplotypeclassspark_2.11-1.0.jar

El resultado de la ejecución es la creación del fichero con los datos extraídos en la carpeta ~/tmp/ del sistema de directorios.