“Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala

En la presente entrada, “Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala”, describo cómo definir un proceso de streaming con Apache Spark con una fuente de datos Apache Kafka definido en lenguaje Scala.

La estructura del artículo está compuesta por los siguientes apartados:

  1.  Apache Kafka. En este apartado realizaré una breve presentación de Kafka, instalación y arranque de los elementos necesarios para el ejemplo.
  2.  Apache Spark. En este apartado realizaré una breve descripción de Spark streaming y la descripción del ejemplo a presentar.

Apache Kafka

Apache Kafka es aquella herramienta que permite construir pipeline de datos en tiempo real y streaming de aplicaciones. Apache kafka es tolerante a fallos y escalable horizontalmente.

Instalación.

El proceso de instalación es un proceso sencillo, simplemente, hay que realizar lo siguiente:

  1. Descarga del fichero comprimido con la herramienta.
  2. Descompresión del fichero descargado en una carpeta.
  3. Acceder a la carpeta principal y ejecutar los ficheros de inicio.

Para aquel lector interesado, existen varias imágenes de contenedores Docker de Kafka.

Inicio del Zookeeper y Kafka

Para iniciar Kafka es necesario ejecutar dos comandos: el primero, iniciar Zookeeper; y, el segundo, inicio del servidor de kafka. Para cada operación, es necesario la apertura de una consola. Así, los comandos son los siguientes:

  • Arranque de Zookeeper. La configuración de Zookeeper se encuentra en el fichero de configuración zookeeper.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar Zookeeper es el siguiente:
>./bin/zookeeper-server-start.sh config/zookeeper.properties
  • Arranque de kafka Server. La configuración de Apache Server se encuentra en el fichero de configuración server.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar el servidor de Kafka es el siguiente:
>./bin/kafka-server-start.sh config/server.properties

Creación de un topic de prueba

Apache Kafka trabaja con topics para el intercambio de información desde los productores hasta los consumidores. El comando para la creación del topic es el siguiente:

> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Las opciones del script tienen el siguiente significado:

  • –create: opción de creación del topic.
  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –replication-factor 1: opción para la definición del número de replicas del topic; en nuestro caso, valor 1.
  • –partitions 1: opción para defininir el número de particiones del topic; en nuestro caso, valor 1.
  • –topic test: nombre del topic a crear; en nuestro caso, test.

Creación de un productor.

Para la creación de un productor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un productor; y, desde ésta, poder escribir aquel texto que se quiera generar.

El comando para el inicio del productor es el siguiente:

> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Una vez ejecutado, la consola se queda a la espera para la introducción del texto deseado.

Creación de un consumidor.

Para la creación de un consumidor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un consumidor; y, desde esta, poder leer aquel texto que ha generado desde el productor.

El comando para el inicio del consumidor es el siguiente:

> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Las opciones del script tienen el siguiente significado:

  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –topic test: nombre del topic a crear; en nuestro caso, test.
  • –from-beginning: opción para la definición del tipo de recepción.

Prueba de funcionamiento

Para la realización de un prueba de un productor y un consumidor, no hay mas que arrancar el productor en una terminal; arrancar el consumidor en una segunda terminal; y, por último,  escribir en el productor aquel texto que se quiera enviar al consumidor; como resultado de la ejecución, se visualizará en la terminal del consumidor el texto insertado en la terminal del productor.

Apache Spark

Apache Spark es un cluster de computación de proposito general el cual provee API en varios lenguajes como Java, Python y Scala, además de un motor  optimizado para la generación de gráficos. También soporta herramientas de alto nivel como son: Spark SQL, para el tratamiento de estructuras de datos; Spark MLLib, para machine learning; GraphX para el proceso gráfico y, por último, Spark Streaming.

Apache Spark Streaming es una extensión del core de Apache Spark con un API de alto rendimiento, escalable con un proceso de ingesta de datos tolerante a fallos. Los datos pueden ser ingestados desde distintas fuentes como son Kafka, Flume, un socket TCP,…; una vez ingestado, pueden ser procesados por funciones de orden superior; y, por último, el resultado del proceso puede ser almacenado en una base de datos, un fichero HDFS o un dashboard.

Gráficamente, Spark Streaming se puede definir de la siguiente forma:

Definición del problema

El problema que planteo es el siguiente: conexión de Apache Streaming con Apache kafka a traves de un topic con nombre test para poder cuantificar el número de palabras introducidas en un mensaje Kafka enviado al topic test desde un productor.

Definición de dependecias

Las dependencias necesarias para la realización del programa de interconexión son las siguientes:

  1. Definición de la dependecia de Spark Core
  2. Definición de la dependeicna con Spark Streaming
  3. Definición del conector de Spark con Kafka.

El objeto con las dependencias queda como sigue:

object Dependencies {
  val sparkVersion = "2.3.1"
  lazy val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
  lazy val sparkStreamming = "org.apache.spark" %% "spark-streaming" % sparkVersion
  lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
}

El fichero build.sbt queda definido como sigue:

import Dependencies._
import sbt.Keys.libraryDependencies
ThisBuild / scalaVersion := "2.11.9"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "ejem-spark",
scalacOptions += "-Ypartial-unification", // 2.11.9+
libraryDependencies += sparkCore,
libraryDependencies += sparkStreamming,
libraryDependencies += sparkStreamingKafka

Solución en Scala

La funcionalidad con la conexión a Kafka consiste en lo siguiente: definición del contexto de Spark y Spark Streaming, definición de la configuración a Kafka, creación del stream con la utilidad de Kafka, procesamiento del resultado; una vez definido, se realiza el arranque del contexto SparkStreaming y se queda a la espera de su finalización.

El código es el siguiente:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object EjemSparkStreamming {
  def exampleStreamming(): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("EjemSparkStreamming-kafka")
    val ssc = new StreamingContext(conf, Seconds(2))
    val topics = "test" // lista de Topic de Kafka
    val brokers = "localhost:9092" // broker de Kafka
    val groupId = "0" // Identificador del grupo.
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
       ConsumerConfig.GROUP_ID_CONFIG -> groupId,
       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
  def main(args: Array[String]): Unit = {
    exampleStreamming()
  }
}

La configuración de conexión a Kafka se define en las variables topics, brokers y groupId. Topics, puede tener una lista de nombres de topic separados por comas; brokers, el endpoint de kafka; y, groupId, del grupo de topics, en nuestro caso no hemos definido. Todos los parámetros, se definen en la estructura Map kafkaParams.

KafkaUtils es aquel componente que realiza la definición del stream al cual se le pasa el contexto de Streaming, las estrategias de localización de los topic y la estrategia de consumidores.

Ejecución y prueba

Para realizar pruebas es necesario tener la infraestructura de Apache Kafka levantada y un productor arrancado; y, por la parte de Spark, arrancaremos la aplicación de forma normal. Así, ejecutaremos los siguiente pasos:

  • En la consola del productor, escribiremos el siguiente texto: “esto es una prueba de Streaming. esto es una prueba”
  • En la consola del programa, el cual estará ejecutándose constantemente, cada dos segundos, realizará la comprobación del topic con el siguiente escritura en la consola:
[...]
19/06/06 17:02:34 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 1329 bytes result sent to driver
19/06/06 17:02:34 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 8 ms on localhost (executor driver) (1/1)
-------------------------------------------
Time: 1559833354000 ms
-------------------------------------------
(es,2)
(una,2)
(Streaming.,1)
(de,1)
(esto,2)
(prueba,2)
[...]

 

Patrón Type Class y Spark

En las pasadas entradas centradas en el patrón type class con título Patrón Type Class  y Patrón Type Class: definición de leyes y test, realicé una descripción de la estructura de dicho patrón. En la presente entrada, Patrón Type Class y Spark, me centraré en la definición de un API mediante el patrón type class utilizando Apache Spark.

Apache Spark es aquel motor de procesamiento y análisis de datos. Apache Spark es una solución muy utilizada en el ámbito del Big Data.

El problema del ejercicio solucionada con Apache Spark consiste en extraer los datos existentes en unas tablas de una base de datos, crear un fichero en formato parquet de los datos; y, dicho fichero, dejarlo en una estructura de directorios. Todo ello, lo más configurable posible.

La definición de las tablas fuentes de datos son tablas, con nombre y campos con un enfoque didáctico, es decir, son tablas con nombre no relevantes. Definiré dos tablas: la tabla libros, con los siguientes campos: codigo, nombre y población; y, la tabla dat_country, con los mismos campos.

El tipo básico del ejemplo es aquel tipo que puede trabajar con el valor parametrizado cuya definición es la siguiente: type Postgresql[T] = T. El tipo lo he identificado con Postgres porque es con la base de datos con la cual realizaré el ejercicio.

La definición del fichero sbt del ejecercicio es la siguiente:

import sbt.Keys.libraryDependencies
name := "EjemploTypeClassSpark"
version := "1.0"
scalaVersion := "2.11.9"
scalacOptions += "-Ypartial-unification" // 2.11.9+
val spark_version = "2.3.1" 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % spark_version,
  "org.apache.spark" %% "spark-sql" % spark_version,
  "com.typesafe" % "config" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

La definición del API con nombre Engine estará formada por una única función con nombre dataExtraction la cual realizará la extración de datos, creación de un fichero en formato parquet y escritura en un path sin datos de retorno, es decir, de tipo Unit. Así, la definición de la parte funcional del type class es la siguiente:

import java.util.{ Properties}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import conf.Configuration
import typeclass.DTO.DataConfiguration
import util.Util
trait Engine[P[_]] {
  def dataExtraction(): P[Unit]
}
object Engine extends EngineInstances with EngineSyntax
trait EngineSyntax{
  object syntax{
    def |-> [P[_]]()(implicit BDI:Engine[P]): P[Unit] = BDI.dataExtraction()
  }
}

En el snippet anterior, definimos los siguientes elementos: el trait Engine del API, la definición del objeto Engine que hereda de las instancias y se comporta como un elemento como EngineSyntax. En este ejercicio, por simplificar, se omite la definición del trait con la definición del lenguaje y las leyes matemáticas que debe de cumplir.

La definición de las instancias con los efectos de lado es la siguiente:

trait EngineInstances{
  def apply[P[_]](implicit BDI:Engine[P]): Engine[P] = BDI
  import typeclass.Types.Postgresql
  implicit object EnginePostgress extends Engine[Postgresql]{
  def logger = LoggerFactory.getLogger(this.getClass)
  def loadTableField(nombreTable:String): List[String] = {
    nombreTable match {
       case "pruebas.libros" => List("codigo","nombre","poblacion")
       case "pruebas.dat_country" => List("codigo","nombre","poblacion")
       case _ => List()
   }
 }
def loadTableToParquet( spark:SparkSession, dataConfiguration:DataConfiguration ): Unit = {
  val engineSpark = spark
    .read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", dataConfiguration.url)
    .option("dbtable", dataConfiguration.nameTable)
    .option("user", dataConfiguration.user )
    .option("password", dataConfiguration.password)
    .option("fetchsize", "1000")
    .load()
    val parameter: List[String] = loadTableField(dataConfiguration.nameTable) //: _*
    engineSpark.select( parameter.toSeq.head, parameter.tail.toSeq: _* ).write.format("parquet").save( Util.getFilePathTarget(dataConfiguration.nameBucket ,dataConfiguration.nameTable)   )
}
def runSpark(url:String, user:String, password:String, numTables:Int, prop:Properties): Either[String, Unit] = {
  val spark = SparkSession.builder.appName("EjemploTypeClassSpark").getOrCreate()
  for(num <- 1 to numTables){
    try{
      val nameTable = prop.getProperty("table_" + num)
      if(!nameTable.equals(null) && !nameTable.equals("")){
        logger.info(s"[*****] Número tabla: ${num}, nombre tabla: ${nameTable}.")
        loadTableToParquet(spark, DataConfiguration(numTables=num, nameTable=nameTable, url=url, user=user, password=password, nameBucket = prop.getProperty("bucketName")) )
      }else{
        logger.info(s"[*****] Número tabla: table_${num} VACÍA")
      }
   } catch {
     case ex: Exception => {
       logger.info(s"[*****] Error en la carga de la tabla con número table_${num}")
       logger.error(s"Exception: ${ex.getMessage}")
     }
   }
 }
 spark.close()
 Right(Unit)
}
override def dataExtraction(): Postgresql[Unit] = {
  for{
    environment <- Configuration.loadEnvironmentVariables.right
    properties <- Configuration.loadProperties(environment.configuration).right
    _ <- runSpark(environment.url, environment.user , environment.password, properties.numTables, properties.properties ).right
  }yield{
    Right(Unit)
  }
 }
}
[...]
}

En el snippet anterior, definimos un trait EngineInstances con un constructor y un objeto implícito EnginePostgress el cual hereda del API Engine. En este trait, si definimos otro tipo de parámetro, como por ejemplo:Either[String,T] definido como type Oracle[T] = Either[String,T], sería el lugar para su implementación.

El objeto EnginePostgress define la función dataExtraction la cual define el programa con las siguientes sentencias: primero, larga de variables de entorno; segunda, carga de la configuración de los ficheros de properties; tercero, las operaciones con Spark; y, por último, el retorno de un elemento de tipo Unit. Las tres funciones son funciones que retornan un contenedor binario de tipo Either.

La función runSpark es aquella función que realiza la constructuctión de una sesión de Spark para realizar una operación sobre una tabla configurada; dicha función, se realiza en la función loadTableToParquet.

La función loadTableToParquet es aquella función que realiza la carga de la configuración del motor Spark para una tabla de una base de datos determinada, extrae los datos en formato parquet y deja el fichero en una ubicación determinada.

La aplicación cliente del API Engine es la siguiente:

import typeclass.Engine.syntax._
import typeclass.Types.Postgresql
object App extends App{
  val startTime: Long = System.currentTimeMillis()
  |->[Postgresql]()
}

El fichero properties con la configuración con el nombre de las tablas, el número de tablas a extraer y el path con el directorio en donde se almacena el resultado es el siguiente:

bucketName=~/tmp/
num_tables=2
table_1=pruebas.libros
table_2=pruebas.dat_country

La variables de entorno a configurar para la ejecución de la aplicación son las siguientes:

export url=jdbc:postgresql://localhost:5432/prueba
export user=postgres
export password=password
export configuration=~/workspace/EjemploTypeClassSpark/src/main/resources/configuration_bdi.properties

Los componentes software para la carga de las variables de entorno y carga de ficheros properties nos las describo en la entrada para reducir el tamaño y por centrar el ejemplo en el patrón type class.

Para finalizar y poder ejecutar la aplicación en local con Apache Spark se ejecuta con el siguiente comando:

cd ~/scala/spark-2.3.1-bin-hadoop2.7/bin
spark-submit --driver-class-path postgresql-42.1.4.jre6.jar --class "App" --master local[2] ~/workspace/EjemploTypeClassSpark/target/scala-2.11/ejemplotypeclassspark_2.11-1.0.jar

El resultado de la ejecución es la creación del fichero con los datos extraídos en la carpeta ~/tmp/ del sistema de directorios.

Patrón Type Class: definición de leyes y test

En la entrada anterior, Patrón Type Class , realicé la definición, descripción y mostré ejemplos del patrón type class. La estructura del patrón está compuesta por un conjunto de elementos trait y un objeto que se comporta como dichos trait. A este objeto, para ciertos tipos de datos, se pueden definir leyes matemáticas para poder realizar test de dicho componente.

En la presente entrada, Patrón Type Class: definición de leyes y test , realizaré la definición del type class monoiede para la operación lógica suma y producto. Para ello, definiré un Type Class con la estructura definida en la anterior entrada añadiendo la definición de las leyes.

Definición de Monoide

En la serie que llevo publicado de la librería Scalaz, publiqué un post con título Scalaz IV: Tipos etiquetados, propiedad asociativa y monoides , en donde se describe el concepto de Monoide, así como, la descripción de unos ejemplos.

En la programación funcional, aparecen escenarios en donde es necesario definir funciones binarias cuyos parámetros de entrada y de salida son del mismo tipo; este tipo de función, se define en la entidad Semigrupos. Desde un punto de vista del lenguaje Scala, un Semigrupo se define de la siguiente forma:

trait Semigroup[A]{
  def combine(x:A, y:A):A
}

El Semigrupo lo definimos con un trait que recibe un tipo parametrizado A y define una función combine cuyos dos parámetros de entrada y la salida son del mismo tipo.

Unos ejemplo matemáticos que representan el Semigroup[A] pueden ser los siguientes:

  1.  1 + 2
  2.  2 + 1
  3. 1 + (2 + 3)
  4. (1 + 2) + 3
  5. (1 * 2) * 3
  6. 1 * (2 * 3)

Como deducimos de los ejemplos anteriores: podemos decir que se cumple la propiedad asociativa, comparando los resultados de los puntos de los ejemplo 1-2, 2-3 y 5-6. Así, podemos afirmar que Semigroup[A] cumple la propiedad asociativa; pero, en función de la operación que se aplique, puede no cumplir esta regla; por ejemplo, la operación resta, no cumple la propiedad asociativa. Unos ejemplos pueden ser los siguientes:

  1. 1 – (2 – 3)
  2. (1 – 2) – 3

De los ejemplos anteriores de la operación resta, el resultado de las operaciones es distinto en los ejemplos del punto 1 y 2.

Las operaciones matemáticas pueden cumplir otras propiedades; como por ejemplo, la propiedad de identidad. La propiedad de identidad necesita un valor vacío o valor zero el cual, en función de la operación, el valor del elemento no varía; por ejemplo: en la operación de suma, el valor zero o vacío es el valor 0; y, para la operación de multiplicación, el valor zero o vacío es el valor 1. Unos ejemplos con el entero 2 y aplicando el elemento vacío, cuyo resultado no cambia el entero, son los siguientes:

  1. Operación suma por la izquierda: 2 + 0 = 2
  2. Operación suma por la derecha: 0 + 2 = 2
  3. Operación multiplicación por la izquierda: 2 * 1 = 2
  4. Operación multiplicación por la derecha: 1 * 2 = 2

Llegado a este punto, los escenarios de conjuntos de elementos en donde se definen unas funciones que cumple las propiedades de asociatividad e identidad, son definidos como Monoides. La definición de Monoide en lenguaje Scala es la siguiente:

trait Monoid[A] extends Semigroup[A]{
  def empty: A
}

En los siguientes apartados, realizaré la descripción de monoides para las funciones lógicas suma (OR) y multiplicación (AND).

Monoide: operación lógica suma (OR)

La definición del Type class de la operación lógica suma (OR) en lenguaje Scala es la siguiente:

trait MonoidSuma[A] extends Monoid[A]{}
object MonoidSuma extends MonoidSumaInstances with MonoidSumaSyntax with MonoidSumaLaws
trait MonoidSumaInstances{
  def apply[A](implicit monoid: MonoidSuma[A]) = monoid
  implicit val monoidBooleanSuma = new MonoidSuma[Boolean] {
    // 1-FORMA
    // override def combine(x: Boolean, y: Boolean): Boolean = (x, y) match{
    // case (true, true ) => true
    // case (true, false) => true
    // case (false, true) => true
    // case (false, false) => false
    // }
    override def combine(x: Boolean, y: Boolean): Boolean = x || y
    override def empty: Boolean = false
  }
}
trait MonoidSumaSyntax{
  object syntax{
    def ++++[A](a:A, b:A)(implicit monoide: MonoidSuma[A]) = monoide.combine(a,b)
    def emptySuma [A](implicit monoide: MonoidSuma[A]) = monoide.empty
  }
}
trait MonoidSumaLaws{
  import MonoidSuma.syntax._
  trait Laws[A]{
    implicit val instance: MonoidSuma[A]
    def asociatividad(a1:A, a2:A, a3:A): Boolean = ++++( ++++(a1,a2), a3) == ++++(a1, ++++(a2,a3) )
    def izquierdaIdentidad(a1:A): Boolean = ++++( a1, emptySuma ) == a1
    def derechaIdentidad(a1:A): Boolean = ++++( emptySuma, a1 ) == a1
  }
  object Laws{
    def apply[A](implicit monoide:MonoidSuma[A]) = new Laws[A] {
      implicit val instance: MonoidSuma[A] = monoide // Dfinición de la referencia del trait.
    }
  }
}

La estructura del type class es la siguiente: trait MonoidSuma, objeto MonoidSuma, trait MonoidSumaInstances, trait MonoidSumaSyntax y MonoidSumaLaws. De la estructura de type class descrita en la entrada anterior, aparece el elemento nuevo trait MonoidSumaLaws en donde se define las funciones con las propiedades matemáticas de asociatividad y de identidad, así como, el objeto con el constructor del monoide.

Para realizar las pruebas del type class de la operación suma es necesario probar las leyes del type class y, para realizar las pruebas, se definen unos test que verifican las leyes matemáticas del type class los cuáles, para el type class función suma lógica, es el siguiente:

class MyMonoidTest extends FlatSpec with Matchers{
  "Test de las leyes del Monoide lógico Suma" should "cumple las leyes asociativas y de identidad" in {
    import es.ams.cap2monoidsemigroup.MonoidSuma.Laws
    val laws = Laws.apply
    assert( laws.asociatividad( true, false, true) == true)
    assert( laws.izquierdaIdentidad(true) == true )
    assert( laws.derechaIdentidad(true) == true )
  }
}

Monoide: operación lógica producto (AND)

La definición del Type class de la operación lógica producto es la siguiente:

trait MonoidProducto[A] extends Monoid[A]{}
object MonoidProducto extends MonoidProductoInstances with MonoidProductoSyntax with MonoidProductoLaws
trait MonoidProductoInstances{
  def apply[A](implicit monoid: MonoidProducto[A]) = monoid
  implicit val monoidProductoSuma = new MonoidProducto[Boolean] {
    // 1-FORMA
    // override def combine(x: Boolean, y: Boolean): Boolean = (x, y) match{
    // case (true, true ) => true
    // case (true, false) => false
    // case (false, true) => false
    // case (false, false) => false
    // }
    override def combine(x: Boolean, y: Boolean): Boolean = x && y
    override def empty: Boolean = true
  }
}
trait MonoidProductoSyntax{
  object syntax{
    def ****[A](a:A, b:A)(implicit monoide: MonoidProducto[A]) = monoide.combine(a,b)
    def emptyProducto [A](implicit monoide: MonoidProducto[A]) = monoide.empty
  }
}
trait MonoidProductoLaws{
  import MonoidProducto.syntax._
  trait Laws[A]{
    implicit val instance : MonoidProducto[A]
    def asociatividad(a1:A, a2:A, a3:A):Boolean = ****( ****(a1, a2), a3) == ****( a2, ****(a2, a3))
    def izquierdaIdentidad(a1:A):Boolean = ****(a1, emptyProducto) == a1
    def derechaIdentidad(a1:A):Boolean = ****(emptyProducto, a1) == a1
  }
  object Laws{
    def apply[A](implicit monoide:MonoidProducto[A]) = new Laws[A]{
      implicit val instance: MonoidProducto[A] = monoide
    }
  }
}

La estructura del type class es la siguiente: trait MonoidProducto, objeto MonoidProducto, trait MonoidProductoSyntax, trait MonoidProductoLaws y MonoidProductoLaws. De la estructura de type class descrita en la entrada anterior, aparece el elemento nuevo trait MonoidProductoLaws en donde se definen las funciones con las propiedades matemáticas de asociatividad y de identidad, así como, el objeto con el constructor del monoide.

Para realizar las pruebas del type class es necesario probar las leyes y, para realizar las pruebas, se define un test que verifican las leyes matemáticas del type class los cuáles, para el type class función producto lógica, es el siguiente:

class MyMonoidTest extends FlatSpec with Matchers{
  "Test de las leyes del Monoide lógico Producto" should "cumple las leyes asociativas y de identidad" in {
    import es.ams.cap2monoidsemigroup.MonoidProducto.Laws
    val laws = Laws.apply
    assert( laws.asociatividad( true, false, true) == true)
    assert( laws.izquierdaIdentidad(true) == true )
    assert( laws.derechaIdentidad(true) == true )
  }
}

La definición de las leyes se realiza utilizando las funciones definidas en la sintaxis y referenciando a los elementos implícitos de las instancias. ScalaTest es el framework seleccionado para la definición y ejecución de los test definidos de los type class.

Patrón Type Class

Las entradas que he publicado hasta la fecha, en su su mayoría, son descripciones y ejemplos de componentes de librerías como Scalaz o Circe. Todas las librerías aplican, en función del problema a resolver, un patrón común el cual es el Patrón Type Class. De la misma manera que en programación orientada a objetos está la clase y la herencia, en la programación funcional, se presenta el patrón Type Class que nos permite el polimorfismo en función del tipo de elementos a tratar.

El patrón Type Class apareció por primera vez con el lenguaje Haskell, lenguaje puramente funcional, para implementar operadores sobrecargados de aritmética e igualdad. En nuestro caso, el patrón type class lo utilizaremos para definir API’s.

La estructura del patrón type class está formado por cuatro elementos básicos los cuales son los siguientes:

  1. Definición del trait con la definición del API.
  2. Definición del trait con las instancias de los elementos que implementa el API en función del tipo.
  3. Definición del trait con la sintaxis.
  4. Definición del objeto que hereda de las instancias y se comportan como el resto de elementos trait.

Este patrón es utilizado en las librerías genéricas de Scala como Scalaz y Cats; librerías que complementan al propio lenguaje y solucionan determinados problemas de la programación funcional. Cada librería, organiza el patrón y estructura sus componentes de forma diferente; pero, en líneas generales, la estructura es la del patrón.

Para realizar la demostración, realizaré la implementación del patrón type class para diferentes funcionalidades.

API Impresión (Printable)

El API de impresión definirá la funcionalidad para realizar la conversión de tipos enteros, string y una entidad a tipo String para poder mostrar por consola. Evidentemente, el tipo String no tiene mucho sentido convertirlo porque ya es tipo String pero, realizaré la funcionalidad necesaria para que sea ilustrativa al lector.

El código del type class de impresión se define en el siguiente snippet del API Printable2 de la siguiente forma:

package es.ams.cap1introduccion
case class Cat(name:String, age:Int, color:String)
trait Printable2[A] {
  def format(a: => A):String
}
object Printable2 extends PrintableInstances2 with PrintableSyntax2
trait PrintableInstances2{
  def apply[A](implicit P:Printable[A]) = P
  implicit val printable2String = new Printable2[String]{
    def format(a: => String): String = a
  }
  implicit val printable2Int = new Printable2[Int]{
    def format(a: => Int): String = a.toString
  }
  implicit val printable2Cat = new Printable2[Cat]{
    def format(a: => Cat): String = a.name + " tiene " + a.age + " y es de color " + a.color
  }
}
trait PrintableSyntax2{
  object syntax{
    def format[A](elem: => A)(implicit P:Printable2[A]): String = P.format(elem)
      def printer[A](elem: => A)(implicit P:Printable2[A]): Unit = println(s"=>${P.format(elem)}")
      implicit class PrintableSyntax2Ops[A](elem: => A)(implicit P:Printable2[A]){
        def formatOps():String = P.format( elem )
        def printOps(): Unit = println( s" ===>${P.format(elem)}" )
      }
  }
}

El primer elemento del type class es el trait Printable2 para un tipo genérico A. El API define la función format el cual recibe un elemento de tipo A que lo transforma en un String.

El segundo elemento del type class es el object Printable2 que hereda de las instancias definidas en el trait Printable2Instances y se comporta como las funciones definidas en el trait Printable2Syntax.

El tecer elemento del type class es el trait Printable2Instances2 el cual define todos los elementos que implementan el API Printable2 para los tipos especificados. En nuestro caso, se implementan las instancias del API Printable2 para los siguientes tipos: String, con el objeto printable2String; Int, con el objeto printable2Int; y, Cat, con el elemento printable2Cat. Para los tres casos, la funcionalidad es sencilla, simplemente, los parámetros de la función format se pasan a String. Además, las tres implementaciones están definidas de forma implicita con la palabra implicit.

Por otro lado, para este tercer caso, es importante la definición de la función apply la cual realiza la construcción de aquella instancia que se requiere en función del tipo, representado por la letra A.

El cuarto y último elemento en este type class es el trait PrintableSyntax2 el cual define las aquellas funciones genéricas para los tipos definidos en el trait con las instancias. En nuestro caso, defino dos funciones y una clase. Las funciones definen las funciones helper y, la clase, define aquellas funciones para elementos de tipo A. Como puede analizar el lector, los elementos operativos son los objetos implícitos que se definen con los parámetros implicit.

A continuación, muestro unos ejemplos de uso de la utilización del API Printable2:

import Printable2.syntax._
println( "->" + format(69) )
println
printer( 89 )
println
val gato: Cat = Cat( name = "John", age=18, color="Blanco")
println( "-->" + format(gato) )
println
printer( Cat( name = "John", age=28, color="Rojo") )
println
val gato = Cat( name = "John", age=38, color="Verde")
println(s"Gato:${gato formatOps()}" )
println
val gato2 = Cat( name = "John", age=48, color="Rosa")
gato2.printOps()
println

La salida por consola es la siguiente:

->69
=>89
-->John tiene 18 y es de color Blanco
=>John tiene 28 y es de color Rojo
Gato:John tiene 38 y es de color Verde
===>John tiene 48 y es de color Rosa

Como observamos en los ejemplos, es necesaria la importación de la sintaxis y el compilador, tras la inferencia de tipos, infiere qué instancia implícita es la que tiene que utilizar.

API Visualización (Show)

En muchas ocasiones no es necesaria la creación de cualquier API porque las librerías genéricas Scalaz o Cats nos propocionan esas API. En el presente apartado, realizaré la encapsulación del API Show existente en la librería Cats. Este ejemplo sigue la misma estructura y es meramente ilustrativo.

Para realizar dicho ejemplo es necesario definir en el fichero build.sbt la dependencia con la librería Cats. La dependencia se define de la siguiente forma:

libraryDependencies += "org.typelevel" %% "cats-core" % "1.0.0-MF"

Para importar los elementos necesarios en la aplicación, se realiza de la siguiente forma:

import cats._
import cats.implicits._

La definición del API MyShow que encapsula el API Show de Cats es el siguiente:

trait MyShow[A] {
  def show(elem:A):String
}
object MyShow extends MyShowInstances with MyShowSyntax
trait MyShowInstances{
  def apply[A](implicit S:MyShow[A]) = S
  implicit val myShowInt = new MyShow[Int] {
    def show(elem:Int): String = {
      Show.apply[Int].show(elem)
    }
  }
  implicit val myShowString = new MyShow[String] {
    def show(elem:String): String = {
      elem.show
    }
  }
  implicit val myShowCat = new MyShow[Cat] {
    def show(elem:Cat): String = {
      elem.name.capitalize.show + " tiene " + elem.age.show + " y es de color " + elem.color.show
    }
  }
}
trait MyShowSyntax{
  object syntax{
    def show[A](elem:A)(implicit S: MyShow[A]) = S.show(elem)
    implicit class MyShowOps[A](elem:A)(implicit S: MyShow[A]){
      def show():String = S.show(elem)
      def =*=>():String = S.show(elem)
    }
  }
}

La estructura y elementos del APi son las mismas que en el caso del API Printable2. La diferencia reside en las instancias implícitas del trait MyShowInstances las cuáles utilizan el API Show de Cats.

Los ejemeplos de utilización del API MyShow son los siguientes:

import MyShow.syntax._
println( "[Syntax] show(69) = " + show(69) )
println
val gato: Cat = Cat(name="gato", age=18, color="Rosa")
println( "[Syntax] show(69) = " + 69 )
println
println( "[Syntax] =*=>()= " + gato.=*=>() )
println
println( "[Syntax] show()= " + gato.show() )
println

La salida por consola es la siguiente:

[Syntax] show(69) = 69
[Syntax] show(69) = 69
[Syntax] =*=>()= Gato tiene 18 y es de color Rosa
[Syntax] show()= Gato tiene 18 y es de color Rosa

Visión funcional

Una función es pura cuando en un programa se puede sustituir una función por el resultado de dicha función y, el funcionamiento del programa, sigue siendo el mismo. Una función no es pura cuando presenta efectos de lado los cuáles son todas aquellas operaciones que suponen a la función que tenga resultados distintos en cada ejecución; como por ejemplo: una operación de entrada-salida, una operación a una base de datos, o bien, una excepción.

En el patrón type class, se diferencian las funciones puras y las funciones que pueden presentar efectos de lado. Las funciones no puras son aquellas que se definen en las instancias del API y, las funciones puras, son las definidas en el trait de la sintaxis y en el API. Así, podemos definir un API entendible por negocio y, la parte de infraestructura, en las instancias; consiguiendo separar los dos ámbitos: el ámbito del mundo de negocio y el ámbito de la infraestructura.

Conclusión

La estructura del patrón Type Class es siempre la misma. Para su correcta entendimiento, es necesario tener claro cómo funcionan los elementos implícitos y, sobre todo, saber diferenciar los elementos funcionales puros y los elementos con efectos de lados; efectos, que suponen que las funciones no sean puras. Este patrón es utilizado por ejemplo para la implementación de Monoides, Funtores o Mónadas.

Scalaz X: Mónada estado

En la entrada anterior, Scalaz IX: Mónada Reader, realicé una descripción de una mónada de lectura; en esta nueva entrada, Scalaz X: Mónada estado, realizaré la descripción de la mónada estado. Para una mejor compresión de la mónada estado, es necesario entender lo que es la mónada de lectura.

Un mónada de estado es aquella mónada que dado un estado inicial, retorna una tupla con una estado siguiente y un valor. La definición matemática la podemos definir de la siguiente manera:

S' -> (S'', R)

Siendo: S’, el estado inicial; S”, el estado siguiente; y, R, el valor.

En Scalaz, la sintáxis de la mónada estado se define en el paquete scalaz.State. La mónada estado es pensada para aquellos problemas que requieran de una computación por estados.

Para realizar la explicación de la mónada estado, realicemos un ejemplo práctico. Definieremos la computación para determinar si se ha leído la palabra “01” y, para ello, necesitamos definir lo siguiente:

  1. Definir una variable estado, así como, sus posibles estados.
  2. Definir las transiciones de estado; transiciones, definidas con mónada estados.
  3. Flujo de lectura

La definición de un estado puede ser definida en un objeto de un determinado tipo, así como, los posibles estados por los que transite. Para el ejemplo, defino el tipo estado como un objeto entero y sus posibles estados. Además, defino el tipo del valor a retornar por la mónada estado.

type Estado = Int
val ESTADO_0:Int = 0 // Estado inicial
val ESTADO_1:Int = 1 // Se ha leído la letra 0
val ESTADO_2:Int = 2 // Estado final, se ha leído la palabra 01.
type Resultado = Boolean

Las transiciones entre estados están definidas con las mónadas estados, las cuales definen la funcionalidad a realizar. En nuestro problema, tenemos que definir dos transiciones: la primera, para determinar si se ha leído el 0; y, la segunda, para determinar si se ha leído el 1. Evidentemente para cada transición, es necesario pasar el valor del evento que produce la transición. En nuestro caso, las mónadas estados con las transiciones son las siguientes:

def leerCero (elemento:Int):State[Estado, Resultado] = State[Estado, Resultado] {
  case estado if ESTADO_0.equals(estado) && elemento.equals(0) => (ESTADO_1, false)
  case _ => (ESTADO_0, false)
}
def leerUno(elemento:Int):State[Estado, Resultado] = State[Estado, Resultado] {
  case estado if ESTADO_1.equals(estado) && elemento.equals(1) => (ESTADO_2, true)
  case _ => (ESTADO_1, false)
}

Un aspecto importante, es entender que el estado es inherente a la mónada estado y se asigna en la creación de la mónada;y, el valor del evento, es el valor pasado por parámetro. Así, una posible iteración es la definida en la siguiente función:

def ejemploSencillo:Unit = {
  val estado:Estado = ESTADO_0
  val (estado1, resultado1) = leerCero(0)(estado)
  println(s"estado1=${estado1} resultado1=${resultado1}")
  val (estado2, resultado2) = leerUno(1)(estado1)
  println(s"estado2=${estado2} resultado1=${resultado2}")
}

La salida por consola es la siguiente:

estado1=1 resultado1=false
estado2=2 resultado1=true

Del ejemplo anterior, se realiza la definición del estado inicial y se realizan las llamadas a las transiciones con los respectivos valores correspondientes a cada evento.

El ejemplo anterior es sencillo pero no es un caso muy real; para transformarlo, voy a definir la misma lógica determinando cuándo se ha leído desde consola la palabra “10”. Para ello, defino un bucle y el conjunto de transiciones. El ejemplo queda definido en el siguiente snippet:

def ejemploIterativo: Unit = {
  var estado: Estado = ESTADO_0
  var encontrada: Boolean = false
  while(!encontrada){
    print("Introduce un numero->")
    (estado) match {
    case ESTADO_0 => {
      val evento=scala.io.StdIn.readInt()
      val (estado0, resultado0) =leerCero(evento)(estado)
      estado = estado0
    }
    case ESTADO_1 => {
      val evento=scala.io.StdIn.readInt()
      val (estado0, resultado0) = leerUno(evento)(estado)
      estado = estado0
      if(resultado0){
        // encontrada = true
        println("PALABRA ENCONTRADA")
        estado = ESTADO_0
      }
   }
 }
}

La salida por consola es la siguiente:
Introduce un numero->0
Introduce un numero->1
PALABRA ENCONTRADA
Introduce un numero->
[...]

Como se observa en el ejemplo anterior, para el curioso lector, se ha dejado comentado el flag de asignación de fin de iteración.

Para el lector interesado, las entradas que he realizado sobre Scalaz hasta la fecha son las siguientes:

Scalaz IX: Mónada Reader

En la presente entrada, Scalaz IX: Mónada Reader, realizaré una descripción de ciertos tipos de funciones y abstracciones hasta llegar a definir una Monada de lectura o Mónada Reader, mónada que permite un parámetro de entrada.

En lenguaje Scala, una función se puede definir como una variable; esta función, es traducida como una clase. En el siguiente ejemplo, se definen las funciones f1 y f2 como variables y se muestra un ejemplo de uso. Con Scalaz, estas funciones se pueden interpretar como functores, con lo cual, podemos concatenar llamadas a dichas funciones con la función map. Esta situación, nos permite ejecutar una secuencia de funciones con un parámetro de entrada.

import scalaz.Scalaz._
val f1 = (_: Int) * 2
val f2 = (_: Int) + 10
println(s"f1(2)=${f1(2)}")
println
println(s"f2(2)=${f2(2)}")
println
println(s" (f1 map f2)(2)=${(f1 map f2) (2)} ") // Función map como functor
println

La salida por consola es la siguiente:

f1(2)=4
f2(2)=12
(f1 map f2)(2)=14

Como observamos en el ejemplo anterior, podemos definir una parametro de lectura identificando el tipo de entrada, en concreto, un valor de tipo entero.

Las funciones de tipo Applicative, nos permite ejecutar varias funciones y, una vez ejecutadas, realizar un cálculo con los resultados de las aplicaciones. En el siguiente ejemplo, muestro un ejemplo de una función applicative en donde se ejecutan dos funciones (f1 y f2) y sus resultado son sumados.

import scalaz.Scalaz._
val f1Applicative = ({
  (_: Int) * 2
} |@| {
  (_: Int) + 10
}) (_ + _)
println(s"f1Applicative(2)=${f1Applicative(2)}")
println

La salida por consola es la siguiente:

f1Applicative(2)=16

Como observamos en el ejemplo anterior, podemos definir una parametro de lectura identificando el tipo de entrada, en concreto, un valor de tipo entero.

Para finalizar, podemos definir una mónada, representado con un for comprehension, que define dos cálculos: el primero, la multiplicación de un tipo entero por dos; el segundo, la suma de un valor entero mas 10; y, para finalizar, la suma de su resultado. El código de ejemplo es el siguiente:

import scalaz.Scalaz._
val f1Monada: Int => Int = for {
  a <- (_: Int) * 2
  b <- (_: Int) + 10
} yield {
  a + b
}
println(s"f1Monada=${f1Monada(2)}")
println

La salida por consola es la siguiente:

f1Monada=16

Como observamos en el ejemplo anterior, podemos definir una mónada con un valor de entrada especificando su tipo. Así, podemos interpretar que una mónada es de lectura porque podemos definir valores de entrada.

Para el lector interesado, las entradas que he realizado sobre Scalaz hasta la fecha son las siguientes:

Scalaz VIII: Construcción de funciones con parámetros con Applicative

En la tercera entrada de la serie, Scalaz III: Apply y Applicative , me centre en la descripción de las API Apply y Applicative. El API Apply nos permite aplicar funciones a functores; y, el API Applicative, permite mapear funciones con N parámetros. En la presente entrada, Scalaz VIII: Construcción de funciones con parámetros con Applicative, describiré la forma de construir funciones de funciones con parámetros.

La definición de una función que realiza la suma de dos enteros de tipo Option con Applicative se puede realizar de la siguiente forma:

println(s"(3.some |@| 5.some) {_ + _}=${(3.some |@| 5.some) {_ + _}}" )

La salida por consola es la siguiente:

(3.some |@| 5.some) {_ + _}=Some(8)

La suma de estos dos elementos es sencilla pero, si necesito aplicar una serie de funciones para un valor de entrada, necesito definir una estructura de función parecida; para ello, definiremos tantas funciones como necesitemos con la definición de cada parámetro. En los siguientes apartados, muestro unos ejemplos de funciones con dos y con tres funciones.

Definición de una función de dos funciones

La definición de una función que realiza la suma de otras dos funciones las cuales, la primera, realiza la multiplicación de un entero por 2; y, la segunda, realiza la suma de un entero mas 10, se define de la siguiente forma:

val applicativeBuilder1 = ({(_:Int) * 2} |@| {(_:Int) + 10}) {_ + _}
println(s"( {(_:Int) * 2} |@| {(_:Int) + 10}) {_ + _}=${applicativeBuilder1(2)}")
println

La salida por consola es la siguiente:

({(_:Int) * 2} |@| {(_:Int) + 10}) {_ + _}=16

Definición de una función de tres funciones

La definición de una función que realiza la suma de tres funciones las cuáles, la primera, realiza la multiplicación de un entero por 2; la segunda, realiza la suma de un entero más 10; y, la tercera, realiza la suma de un entero mas 20, se define de la siguiente forma:

val applicativeBuilder2 = ({(_:Int) * 2} |@| {(_:Int) + 10} |@| {(_:Int) + 20}) {_ + _ + _}
println(s"({(_:Int) * 2} |@| {(_:Int) + 1}) {_ + _}=${applicativeBuilder2(2)}")
println

La definición de una función que realiza la multiplicación de dos enteros y al resultado se le suma una función las cuáles, la primera, realiza la multiplicación de un entero por 2; la segunda, realiza la suma de un entero más 10; y, la tercera, realiza la suma de un entero mas 20, se define de la siguiente forma:

val applicativeBuilder3 = ({(_:Int) * 2} |@| {(_:Int) + 10} |@| {(_:Int) + 20}) {_ * _ + _}
println(s"({(_:Int) * 2} |@| {(_:Int) + 1}) {_ * _ + _}=${applicativeBuilder3(2)}")
println

La salida por consola de las funciones son las siguientes:

({(_:Int) * 2} |@| {(_:Int) + 1}) {_ + _ + _}=38
({(_:Int) * 2} |@| {(_:Int) + 1}) {_ * _ + _}=70

En el caso que necesitemos tener funciones con distintos parámetros, utilizaremos el API Applicative como el descrito en la tercera entrada de la serie.

Para el lector interesado, las entradas que he realizado sobre Scalaz hasta la fecha son las siguientes: