“Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala

En la presente entrada, “Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala”, describo cómo definir un proceso de streaming con Apache Spark con una fuente de datos Apache Kafka definido en lenguaje Scala.

La estructura del artículo está compuesta por los siguientes apartados:

  1.  Apache Kafka. En este apartado realizaré una breve presentación de Kafka, instalación y arranque de los elementos necesarios para el ejemplo.
  2.  Apache Spark. En este apartado realizaré una breve descripción de Spark streaming y la descripción del ejemplo a presentar.

Apache Kafka

Apache Kafka es aquella herramienta que permite construir pipeline de datos en tiempo real y streaming de aplicaciones. Apache kafka es tolerante a fallos y escalable horizontalmente.

Instalación.

El proceso de instalación es un proceso sencillo, simplemente, hay que realizar lo siguiente:

  1. Descarga del fichero comprimido con la herramienta.
  2. Descompresión del fichero descargado en una carpeta.
  3. Acceder a la carpeta principal y ejecutar los ficheros de inicio.

Para aquel lector interesado, existen varias imágenes de contenedores Docker de Kafka.

Inicio del Zookeeper y Kafka

Para iniciar Kafka es necesario ejecutar dos comandos: el primero, iniciar Zookeeper; y, el segundo, inicio del servidor de kafka. Para cada operación, es necesario la apertura de una consola. Así, los comandos son los siguientes:

  • Arranque de Zookeeper. La configuración de Zookeeper se encuentra en el fichero de configuración zookeeper.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar Zookeeper es el siguiente:
>./bin/zookeeper-server-start.sh config/zookeeper.properties
  • Arranque de kafka Server. La configuración de Apache Server se encuentra en el fichero de configuración server.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar el servidor de Kafka es el siguiente:
>./bin/kafka-server-start.sh config/server.properties

Creación de un topic de prueba

Apache Kafka trabaja con topics para el intercambio de información desde los productores hasta los consumidores. El comando para la creación del topic es el siguiente:

> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Las opciones del script tienen el siguiente significado:

  • –create: opción de creación del topic.
  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –replication-factor 1: opción para la definición del número de replicas del topic; en nuestro caso, valor 1.
  • –partitions 1: opción para defininir el número de particiones del topic; en nuestro caso, valor 1.
  • –topic test: nombre del topic a crear; en nuestro caso, test.

Creación de un productor.

Para la creación de un productor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un productor; y, desde ésta, poder escribir aquel texto que se quiera generar.

El comando para el inicio del productor es el siguiente:

> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Una vez ejecutado, la consola se queda a la espera para la introducción del texto deseado.

Creación de un consumidor.

Para la creación de un consumidor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un consumidor; y, desde esta, poder leer aquel texto que ha generado desde el productor.

El comando para el inicio del consumidor es el siguiente:

> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Las opciones del script tienen el siguiente significado:

  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –topic test: nombre del topic a crear; en nuestro caso, test.
  • –from-beginning: opción para la definición del tipo de recepción.

Prueba de funcionamiento

Para la realización de un prueba de un productor y un consumidor, no hay mas que arrancar el productor en una terminal; arrancar el consumidor en una segunda terminal; y, por último,  escribir en el productor aquel texto que se quiera enviar al consumidor; como resultado de la ejecución, se visualizará en la terminal del consumidor el texto insertado en la terminal del productor.

Apache Spark

Apache Spark es un cluster de computación de proposito general el cual provee API en varios lenguajes como Java, Python y Scala, además de un motor  optimizado para la generación de gráficos. También soporta herramientas de alto nivel como son: Spark SQL, para el tratamiento de estructuras de datos; Spark MLLib, para machine learning; GraphX para el proceso gráfico y, por último, Spark Streaming.

Apache Spark Streaming es una extensión del core de Apache Spark con un API de alto rendimiento, escalable con un proceso de ingesta de datos tolerante a fallos. Los datos pueden ser ingestados desde distintas fuentes como son Kafka, Flume, un socket TCP,…; una vez ingestado, pueden ser procesados por funciones de orden superior; y, por último, el resultado del proceso puede ser almacenado en una base de datos, un fichero HDFS o un dashboard.

Gráficamente, Spark Streaming se puede definir de la siguiente forma:

Definición del problema

El problema que planteo es el siguiente: conexión de Apache Streaming con Apache kafka a traves de un topic con nombre test para poder cuantificar el número de palabras introducidas en un mensaje Kafka enviado al topic test desde un productor.

Definición de dependecias

Las dependencias necesarias para la realización del programa de interconexión son las siguientes:

  1. Definición de la dependecia de Spark Core
  2. Definición de la dependeicna con Spark Streaming
  3. Definición del conector de Spark con Kafka.

El objeto con las dependencias queda como sigue:

object Dependencies {
  val sparkVersion = "2.3.1"
  lazy val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
  lazy val sparkStreamming = "org.apache.spark" %% "spark-streaming" % sparkVersion
  lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
}

El fichero build.sbt queda definido como sigue:

import Dependencies._
import sbt.Keys.libraryDependencies
ThisBuild / scalaVersion := "2.11.9"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "ejem-spark",
scalacOptions += "-Ypartial-unification", // 2.11.9+
libraryDependencies += sparkCore,
libraryDependencies += sparkStreamming,
libraryDependencies += sparkStreamingKafka

Solución en Scala

La funcionalidad con la conexión a Kafka consiste en lo siguiente: definición del contexto de Spark y Spark Streaming, definición de la configuración a Kafka, creación del stream con la utilidad de Kafka, procesamiento del resultado; una vez definido, se realiza el arranque del contexto SparkStreaming y se queda a la espera de su finalización.

El código es el siguiente:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object EjemSparkStreamming {
  def exampleStreamming(): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("EjemSparkStreamming-kafka")
    val ssc = new StreamingContext(conf, Seconds(2))
    val topics = "test" // lista de Topic de Kafka
    val brokers = "localhost:9092" // broker de Kafka
    val groupId = "0" // Identificador del grupo.
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
       ConsumerConfig.GROUP_ID_CONFIG -> groupId,
       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
  def main(args: Array[String]): Unit = {
    exampleStreamming()
  }
}

La configuración de conexión a Kafka se define en las variables topics, brokers y groupId. Topics, puede tener una lista de nombres de topic separados por comas; brokers, el endpoint de kafka; y, groupId, del grupo de topics, en nuestro caso no hemos definido. Todos los parámetros, se definen en la estructura Map kafkaParams.

KafkaUtils es aquel componente que realiza la definición del stream al cual se le pasa el contexto de Streaming, las estrategias de localización de los topic y la estrategia de consumidores.

Ejecución y prueba

Para realizar pruebas es necesario tener la infraestructura de Apache Kafka levantada y un productor arrancado; y, por la parte de Spark, arrancaremos la aplicación de forma normal. Así, ejecutaremos los siguiente pasos:

  • En la consola del productor, escribiremos el siguiente texto: “esto es una prueba de Streaming. esto es una prueba”
  • En la consola del programa, el cual estará ejecutándose constantemente, cada dos segundos, realizará la comprobación del topic con el siguiente escritura en la consola:
[...]
19/06/06 17:02:34 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 1329 bytes result sent to driver
19/06/06 17:02:34 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 8 ms on localhost (executor driver) (1/1)
-------------------------------------------
Time: 1559833354000 ms
-------------------------------------------
(es,2)
(una,2)
(Streaming.,1)
(de,1)
(esto,2)
(prueba,2)
[...]

 

Creación de un proyecto Kotlin con Gradle

Las herramientas de gestión de ciclo de vida difieren en cada lenguaje, o bien, hay herramientas que soportan la gestión para diferentes lenguajes; por ejemplo: sbt, permite la creación y gestión de proyectos en Scala y Java; maven, permite la creación de proyectos Java, Scala o Kotlin; y, Gradle, permite la creación y gestión de proyectos en Scala, Java o Kotlin. En la entrada de hoy, Creación de un proyecto Kotlin con Gradle, me centraré en la definición de un proyecto base en lenguaje Kotlin con Gradle.

1.- Gradle

Gradle es una herramienta open source para la automatización y gestión de ciclo de vida software con el cual podemos definir script en lenguaje Groovy o con un DSL Kotlin. Es una herramienta flexible, rápida, customizable y permite la gestión de proyectos de distinto lenguajes, como pueden ser: Java, Android, Scala, Kootlin,…

Para el lector interesado, la versión actual es la 5.4.1 y la la referencia a la documentación es la siguiente.

1.1.- Instalación de Gradle

El proceso de instalación es un proceso característico en función del entorno de trabajo, ya sea Linux, Windows o Mac. La referencia documental de instalación es la siguiente. 

En mi caso, el entorno de trabajo es sobre sistema Linux, en el cual el proceso de instalación consiste en lo siguiente: realizar la descarga de Gradle, descompresión del fichero descargado sobre un carpeta; y, por último, definir la variable de entorno. Para confirmar la instalación, se ejecuta el siguiente comando en una terminal:

gradle -v

La salida del comando tiene el siguiente aspecto:

 ------------------------------------------------------------
Gradle 5.4.1
------------------------------------------------------------

Build time: 2019-04-26 08:14:42 UTC
Revision: 261d171646b36a6a28d5a19a69676cd098a4c19d

Kotlin: 1.3.21
Groovy: 2.5.4
Ant: Apache Ant(TM) version 1.9.13 compiled on July 10 2018
JVM: 1.8.0_201 (Oracle Corporation 25.201-b09)
OS: Linux 4.6.7-040607-generic amd64

 

2.- Creación del proyecto

Una vez que tenemos Gradle instalado en la máquina, el proceso de creación del proyecto es sencillo y parecido a otras herramientas.

El proceso se define en los siguientes pasos:

  1. Creación de una carpeta del proyecto. La creación del directorio lo realizamos creando desde la línea de comandos una carpeta; en el presente caso, se ejecuta el siguiente comando: mkdir ejem2-kotlin. Una vez ejecutado, nos situamos en el interior de la carpeta mediante el comando cd ejem2-kotlin.
  2. Inicialización del proyecto. La inicialización del proyecto consiste en aquel proceso en el cual se realiza la creación de la estructura del proyecto. Dicho proceso, se realiza ejecutando el siguiente comando: gradle init. La consecuencia de este comando es la ejecución de un wizard en el cual se solicita una serie de cuestiones, entre ellos, selección de lenguaje (permite la creación de proyectos en Java, Korlin o Scala), nombre del proyecto y estructura de carpetas.

La estructura de carpetas del proyecto desde el visor de proyectos del IDE IntelliJ es el siguiente:

La estructura de carpetas y ficheros creados a destacar son los siguientes:

  • src.- Carpeta de código fuente con el aspecto típico, contiene la carpeta src/main/kotlin y src/test/kotlin; además, de las carpetas resources.
  • build.- carpeta con el contenido generado por el compilador kotlin.
  • out.- carpeta con el resultado de la compilación del proyecto.
  • gradle.- carpeta con los wrapper de gradle.
  • .gitignore.- fichero git.
  • build.gradle.kts.- fichero gradle con la configuración.
  • settings.gradle.kts.- fichero con propiedades.
  • gradlew.bat y gradlew.sh.- Fichero de ejecución Gradle del proyecto.

3.- Fichero de construcción build.gradle.kts

El fichero build.gradle.kts es aquel fichero en donde definimos la configuración necesaria para trabajar con el proyecto, como por ejemplo: las dependencias, repositorios y la clase principal.

La estructura del fichero está compuesta por los siguientes apartados:

  • plugins.- definición de las referencias de plugins existentes.
  • repositories.- definición de las referencias a los repositorios de artefactos.
  • dependencies.- definición de los artefactos dependientes en el proyecto.
  • application.- definición de la configuración de la aplicación, en concreto, la definición de la clase principal.

Un ejemplo de fichero build.gradle.kts para un proyecto en lenguaje Kotlin con las dependencias de la librería Arrow es la siguiente:

 import org.jetbrains.kotlin.kapt3.base.Kapt.kapt
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Kotlin application project to get you started.
*/
plugins {
  // Apply the Kotlin JVM plugin to add support for Kotlin on the JVM.
  id("org.jetbrains.kotlin.jvm").version("1.3.21")
  kotlin("kapt") version "1.3.31"
  // Apply the application plugin to add support for building a CLI application.
  application
}
repositories {
  // Use jcenter for resolving your dependencies.
  // You can declare any Maven/Ivy/file repository here.
  jcenter()
  mavenCentral()
}
val arrow_version = "0.9.0"
dependencies {
  // Use the Kotlin JDK 8 standard library.
  implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
  // Use the Kotlin test library.
  testImplementation("org.jetbrains.kotlin:kotlin-test")
  // Use the Kotlin JUnit integration.
  testImplementation("org.jetbrains.kotlin:kotlin-test-junit")
  compile( "io.arrow-kt:arrow-core-data:$arrow_version")
  compile( "io.arrow-kt:arrow-core-extensions:$arrow_version" )
  compile( "io.arrow-kt:arrow-syntax:$arrow_version")
  compile( "io.arrow-kt:arrow-typeclasses:$arrow_version")
  compile( "io.arrow-kt:arrow-extras-data:$arrow_version")
  compile( "io.arrow-kt:arrow-extras-extensions:$arrow_version")
  kapt( "io.arrow-kt:arrow-meta:$arrow_version")
  compile( "io.arrow-kt:arrow-query-language:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-free-data:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-free-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-mtl:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-data:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-io-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-rx2-data:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-rx2-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-reactor-data:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-effects-reactor-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-optics:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-generic:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-recursion-data:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-recursion-extensions:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-query-language:$arrow_version") //optional
  compile( "io.arrow-kt:arrow-integration-retrofit-adapter:$arrow_version") //optional
}
application {
  // Define the main class for the application.
  mainClassName = "es.ams.AppKt"
}

4.- Comandos Gradle

Gradle es una herramienta con la cual se ejecutan tareas con una funcionalidad determinada. En el presente apartado, identificaré un conjunto de comandos para la ejecución de ciertas tareas mínimas.

Un conjunto de tareas básicas son las siguientes:

  • gradle clean.- Eliminación de la carpeta build.
  • gradle build.- Construcción de los componentes binarios del proyecto.
  • gradle tasks.- visualización de las tareas definidas en el proyecto.
  • gradle properties.- visualización de las propiedades definidas en el proyecto.
  • gradle run.– Ejecución del proyecto.
  • gradle projects.- visualización de la información del proyecto.
  • gradle tests.- Ejecución de los test definidos en el proyecto.

Si se utiliza un IDE como IntelliJ, una vez importado y configurado la ubicación de Gradle, se pueden ejecutar desde la propia herramienta. Un aspecto de la estructura, contenido de clases y dependencias es el que se representa en la siguiente imagen:

5.- Conclusiones

La entrada define aspectos básicos y genéricos de Gradle. El objetivo de la entrada es la creación de un proyecto básico de Gradle en lenguaje Kotlin que permita a una persona realizar la creación de un proyecto y empezar a trabajar en dicho entorno.

Si realizamos un proceso comparativo a alto nivel con otros herramientas, el proceso es similar y, en concreto, me ha resultado una forma de trabajo parecida a la herramienta sbt para la gestión de proyectos en lenguaje Scala.

Validated: control de errores

En toda aplicación software es necesario realizar tareas de validación de campos, validación de formularios,o bien, verificación de una función;y, una vez realizadas las validaciones individuales, es necesario realizar la validación del conjunto de todas ellas. En la entrada de hoy, Validated: control de errores, me centraré en describir y realizar un ejemplo de validación de los campos de un formulario representado en una estructura de tipo Map.

Supongamos que estamos desarrollando una aplicación en la cual tenemos un formulario de dos elementos: el primero, el campo nombre; y, el segundo, el campo edad. Las validaciones que tenemos que realizar son: validación del campo nombre, validación del campo edad y evaluación de la validación del conjunto del formulario.

El campo nombre debe de ser un campo que no sea vacío. El campo edad debe de ser un
campo no vacío, entero y mayor que cero.

Las validaciones se realizan mediante el tipo Validated el cual no está definido como un tipo estándar de Scala, está definido en la librería Cats. La importación del tipo Validated y el resto de importaciones necesarias para el ejemplo son las siguientes:

import cats.data.Validated
import cats.instances.list._
import cats.syntax.all._

El primer paso a realizar es realizar la definición de los alias de todas las estructuras sobre las que trabajaremos. Los alias son los siguientes:

  • Definición del formulario. El formulario es una estructura de tipo Map.
type Form = Map[String, String]
  • Definición de la estructura de control de error sencillas. Las verificaciones
    sencillas se realizarán con un elemento de tipo Either
type ControlErrorFast[A] = Either[List[String], A]
  • Definición de la estructura de control de error compleja o validación. Las verificaciones complejas se realizarán con un elemento de tipo Validated.
type ValidatedForm[A] = Validated[List[String], A]

La primera operación a realizar es la obtención de un elemento del formulario. Para
realizar dicha operación, definiremos la función getValue de la siguiente manera:

def getValue(form: Form)(campo: String) : ControlErrorFast[String] = 
  form.get(campo)
    .toRight(List(s"El valor de $campo no está especificado"))

La segunda operación es realizar la verificación del campo nombre. Para realizar dicha operación, definimos la función readName y sus funciones auxiliares. El código es el siguiente:

def nonBlank(nombre:String)(dato:String): ControlErrorFast[String] =
   Right(dato)
    .ensure(List(s"El campo $nombre no debe de ser vacío."))
     (_.nonEmpty)

def readName(form: Form): ControlErrorFast[String] =
  getValue(form)("name")
   .flatMap( elem => nonBlank("name")(elem))

def nonBlank(nombre:String)(dato:String): ControlErrorFast[String] =
  Right(dato) 
   .ensure(List(s"El campo $nombre no debe de ser vacío."))
    (_.nonEmpty)

La tercera operación es realizar la verificación del campo edad. Para realizar dicha
operación, definiremos la función readAge y sus funciones auxiliares. El código es el
siguiente:

def readAge(form: Form): ControlErrorFast[Int] =
  getValue(form)("age")
   .flatMap(nonBlank("age"))
   .flatMap(parseInt("age"))
   .flatMap(nonNegative("age"))

def parseInt(nombre:String)(age:String): ControlErrorFast[Int] =
  Either.catchOnly[NumberFormatException](age.toInt)
   .leftMap(_ => List(s"El campo $nombre debe de ser numérico"))

def nonNegative(nombre:String)(dato:Int): ControlErrorFast[Int] =
  Right(dato)
   .ensure(List(s"El campo $nombre no es válido"))
    (_ >= 0 )

Para finalizar, una vez realizadas las verificaciones de los campos, es necesario realizar la validación del formulario para obtener el listado de los posibles errores presentes en el formulario, o bien, retornar el resultado final.

La validación del formulario se realiza utilizando elementos de tipo Validated y un elemento que trabaje con los contextos de validación; dicho elemento, es un Semigroupal el cual genera una tupla de elementos del mismo contexto.

val formHtml: Form = Map("name" -> "Pepito", "age" -> "40")
val valid1_1:ValidatedForm[String] = Validated.fromEither(readName(formHtml))
val valid1_2:ValidatedForm[Int] = Validated.fromEither(readAge(formHtml))
val resultado1 = (valid1_1, valid1_2).tupled
println(s"resultado1=${resultado1}")

La salida por consola es la siguiente:

resultado1=Valid((Pepito,40))

Para el supuesto de trabajar con un formulario con datos erróneos, el código sería el siguiente:

val formHtmlKO3: Form = Map("name" -> "", "age" -> "-1")
val valid4_1:ValidatedForm[String] = Validated.fromEither(readName(formHtmlKO3))
val valid4_2:ValidatedForm[Int] = Validated.fromEither(readAge(formHtmlKO3))
val resultado4 = (valid4_1, valid4_2).tupled
println(s"resultado4=${resultado4}")
println

La salida por consola es la siguiente:

resultado4=Invalid(List(El campo name no debe de ser vacío., El campo age no es válido))

Como conclusión final, una de las opciones para el control de errores es utilizar el componente Either, pudiendo controlar los valores y las excepciones. Cuando todos los elementos Either son agrupados para su evaluación con Validated y un Semigropal podemos obtener el resultado, o bien, el conjunto de los mensajes de las no validaciones que se han producido en el mismo instante. A diferencia de otros procesos de validación, la ventaja reside en que conocemos todos los fallos producidos en el mismo tiempo.

 

Test unitarios y cobertura de código en Python

En la presente entrada, Test unitarios y cobertura de código en Python, realizaré la descripción de cómo se realizan test unitarios en Python con unittest y, además, cómo se realizan el análisis del código para generar el índice de cobertura de código con la herramienta coverage.

Los ejemplos estarán realizados con la versión 3.6 de Python.

logo-python

Test unitarios

Los test unitarios los definimos utilizando el framework unittest el cual está incorporado en la distribución de la versión del lenguaje.

Para realizar un test de un código, iniciaremos la definición de un código al cual se definirán el conjunto de test a definir. Este proceso inicial lo realizaré para comprender el proceso.

Definiré una clase de utilidad Util con un método statusToCode cuya funcionalidad consistirá en parsear un parámetro alfanumérico de entrada y, como salida, retornará un valor entero.

El snippet de la clase es la siguiente:

class Util:
  """Class utils."""
  @staticmethod
  def statusToCode(code="") -> int:
    """Return exit code"""
    assert len(code) > 0, "Argument not valid"
    result = {
      'UP': 0,
      'WARNING': 1,
      'CRITICAL': 2,
      'UNKOWN': 3,
    }.get(code, 3)
    return result

La clase Util está definida en el módulo util.py dentro de la carpeta lib. La función statusToCode tiene un decorador definido con nombre @staticmethod el cual permite definir el método en la clase con referencia estática para poderlo utilizar sin la necesidad de instanciar la clase.

Los test los definiremos en la carpeta lib_test la cual está definida al mismo nivel que la clase lib. Definiré las clases de test conforme a los módulos definidos. Así, tendremos la clase UtilTest en el módulo test_utils.py de la carpeta lib_test.

La clase UtilTest deberá de importar el módulo unittest y definir la clase heredando de la clase unittest.TestCase para poder realizar los test. Además, deberá de importar la clase con el código que se desea probar. Así, la clase queda definida de la siguiente forma:

import unittest
from lib.utils import Util

class UtilTests(unittest.TestCase):
  def setUp(self):
    pass

  def test_statusToCode_EMPTY(self):
    try:
      print(sys.executable)
      Util.statusToCode("")
    except AssertionError as exception:
      self.assertTrue(exception != None)

  def test_statusToCode_UP(self):
    self.assertEqual(Util.statusToCode("UP"), 0)

  def test_statusToCode_WARNING(self):
    self.assertEqual(Util.statusToCode("WARNING"), 1)

  def test_statusToCode_CRITICAL(self):
    self.assertEqual(Util.statusToCode("CRITICAL"), 2)

  def test_statusToCode_UNKNOWN(self):
    self.assertEqual(Util.statusToCode("UNKNOWN"), 3)

La clases UtilTests presenta seis métodos: el método setUp, el cual realiza la definición de las operaciones previas a la ejecución de los test, en nuestro caso no es necesario realizar ninguna; y, el resto de métodos, que definen los test al tener como prefijo la cadena “test_”.

La verificación de los resultados se realiza empleando la referencia self la cual define las funciones de comprobación.

Cobertura

La cobertura de código la realizaremos con la herramienta coverage cuya referencia la pondremos en el fichero requirements.txt para que sea cargado en el entorno virtual del proyecto.

La herramienta coverage tiene la capacidad de realizar la generación de los informes por línea de comando, o bien, la generación de los informes en formato html; dichos informes, se generarán en la carpeta htmlcov del propio proyecto.

Los comandos que ejecutaremos son los siguientes:

  • coverage erase.- Eliminación de los datos previos de cobertura. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage erase
  • coverage run.- Arranque del programa Python que recolecta los datos. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage run –omit=’.tox/*,.venv/*’ -m unittest
  • coverage report.- Generación resultados. Un ejemplo de ejecución en la línea de comandos es el siguiente:  coverage report –omit=’.tox/*,venv/*’ -m
  • coverage html.- Generación de los informer en formato HTML. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage html –omit=’.tox/*,venv/*’

Un ejemplo de informe de cobertura tiene el siguiente aspecto:

python coverage html

Automatización del proceso de Cobertura

En el apartado anterior, he definido la forma de ejecutar los test y la Generación de los informes de cobertura y, en el presente apartado, realizaré la descripción de cómo lo podemos automatizar.

La automatización la realizamos empleando la herramienta tox(https://tox.readthedocs.io/en/latest/) Para poder utilizar tox, primeramente, es necerios definir en el fichero requirement.txt la herramienta tox; una vez instalado en el entorno virtual, deberemos definir el plan de ejecución de tox el cual se define en el fichero tox.ini ubicado en la carpeta raíz del proyecto.

El aspecto del fichero tox es el siguiente:

[tox]
envlist = py36, coverage-report
skipsdist = True

[testenv]
commands = python -m pytest {posargs}
deps =
-r{toxinidir}/requirements.txt
freezegun==0.3.9
pytest==3.5.0
passenv=*

[testenv:coverage-report]
skip_install = true
commands =
coverage erase
coverage run --omit='.tox/*,.venv/*' -m unittest
coverage report --omit='.tox/*,venv/*' -m
coverage html --omit='.tox/*,venv/*'

El fichero tox está compuesto de tres elementos de configuración los cuáles tienen la siguiente descripción:

  • Elemento tox: Definición del entorno virtual de ejecución, elemento a ejecutar y el flag de generación del artefacto para la distribución
  • Elemento testenv: Definición de la configuración necesaria por tox.
  • Elemento coverage-report: Definición de la secuencia de comandos de la herramienta coverage para el cálculo y generación de informes de cobertura.

Para ejecutar el proceso automático tecleamos en la línea de comando y posicionados en la carpeta de proyecto el comando tox.

Conclusión

El proceso de generación de test unitartios en Python es un proceso parecido a otros lenguajes como Java o Scala. Además, al estár el framework incorporado en la distribución no requiere de ninguna operación de carga, facilitando su uso.

 

Cats I: Mónada Eval

En entrada anteriores, he hablado de cierto tipo de mónada como son la mónada Reader o la mónada Estado con la librería Scalaz. En la entrada de hoy, Cats: Mónada Eval, me centraré en la mónada Eval definida en la librería Cats. La librería Cats es una de las librerías básica del ecosistema de Scala como lo es librería Scalaz.

Tipos de evaluación en Scala

La definición de variables con la palabra reservada val en Scala implica la definición de una variable inmutable. Por otro lado, si se quiere definir una variable mutable, se emplea la palabra reservada var.

Para los valores inmutables, se puede definir qué evaluación tiene una variable; es decir, se puede definir si una variables tiene evaluación inmediata o perezosa; pero, además, se puede definir si el valor es cacheado o no. Así, podemos definir variables de la siguiente forma:

  •  Variable con la palabra reservada val.– Definición de una variable con evaluación inmediata y memorizable.

Un ejemplo es el siguiente:

val x = {
  println("Procesando X")
  Math.random
}
println(x)
println(x)

La salida por consola es la siguiente:

Procesando X
0.49063463192831624
0.49063463192831624
  • Variable con las palabras reservadas lazy y val.- Definición de una variable con evaluación perezosa y memorizable.
lazy val x = {
  println("Procesando X")
  Math.random
}
println(x)
println(x)

La salida por consola es la siguiente:

Procesando X
0.7903293397160105
0.7903293397160105
  • Variable con la palabra reservada def.- Definición de una variable con evaluación perezosa y no memorizable.
def x = {
  println("Procesando X")
  Math.random
}
println(x)
println(x)

La salida por consola es la siguiente:

Procesando X
0.8181569326322171
Procesando X
0.2682764923719232

2.- Mónada Eval. Tipos de evaluación con Cats

La librería Cats define una mónada para controlar las evaluaciones. La mónada es un wrapper de un valor o de una computación que produce un valor. Además, Eval soporta una computación perezona segura para una pila mediante los métodos map y flatMap.

Los tipos de Eval son los siguientes:

  • Eval.now.- La evaluación now es equivalente a val. Un ejemplo es el siguiente:
import cats.Eval
val x = Eval.now{
println("Procesando X")
Math.random
}
println(x.value)
println(x.value)

La salida por consola es la siguiente:

Procesando X
0.8337324158188836
0.8337324158188836
  • Eval.later.- La evaluación later es equivalente a lazy val. Un ejemplo es el siguiente:
import cats.Eval
val x = Eval.later{
  println("Procesando X")
  Math.random
}
println(x.value)
println(x.value)

La salida por consola es la siguiente:

Procesando X
0.8335294714853098
0.8335294714853098
  • Eval.always.- La evaluación always es equivalente a def. Un ejemplo es el siguiente:
val x = Eval.always{
  println("Procesando X")
  Math.random
}
println(x.value)
println(x.value)

La salida por consola es la siguiente:

Procesando X
0.36159399101292466
Procesando X
0.7171589355658571
  •  Otros ejemplos:

Procesamiento de una computación Eval.always y función map

val greeting = Eval
  .always{ println("Paso 1"); "Hello"}
  .map{ str => println("Paso 2"); s"${str} world" }
println(greeting.value)

La salida por consola es la siguiente:

Paso 1
Paso 2
Hello world

Computación de operaciones Eval de tipo now y always en una for comprehension .

val ans = for{
  a <- Eval.now{ println("Calculating A"); 40 }
  b <- Eval.always{ println("Calculating B "); 2 }
}yield{
  println("Sumando A y B")
  a + b
}
println(s"Calculo1=${ans.value}")
println
println(s"Calculo2=${ans.value}")
println

La salida por consola es la siguiente:

Calculating A
Calculating B
Sumando A y B
Calculo1=42
Calculating B
Sumando A y B
Calculo2=42

3.- Otras operaciones

La mónada Eval es interesante su uso en una computación que retorne un valor; pero, hay ocasiones que se necesita cachear un valor o deferir la computación para evitar una excepción de tipo StackOverflow. Para ello, están las funciones memoize y defer.

3.1.- Función memoize.

La función memoize permite el cacheo de un valor, o bien, la cadena de una computación. Un ejemplo es el siguiente:

val saying = Eval
 .always{ println("Paso 1"); "Un gato" }
 .map{ str => println("Paso 2"); s"${str} siéntate" }
 .memoize
 .map{ str => println("Paso 3"); s"${str} la alfombra" }
println(s"Calculo1=${saying.value}")
println
println(s"Calculo2=${saying.value}")

La salida por consola es la siguiente:

Paso 1
Paso 2
Paso 3
Calculo1=Un gato siéntate la alfombra
Paso 3
Calculo2=Un gato siéntate la alfombra

3.2.- Función defer.

Las funciones para la manipulación de valores con Eval es mediante las funciones map y flatMap. Así, con las sucesivas llamadas se van apilando una conjunto de operaciones que, una vez apiladas todos los posibles casos, se realiza el cálculo.

Un ejemplo de este escenario, es el cálculo del factorial con la multiplicación del número n en cada llamada. El código es el siguiente:

def factorial(n: BigInt): Eval[BigInt] ={
  if(n==1){
    Eval.now(n)
  }else{
    factorial(n-1).map( _ * n )
  }
}
println( factorial(50000).value )

La salida por consola sería la siguiente:

Exception in thread "main" java.lang.StackOverflowError

Para evitar esta situación, podemos emplear la función defer con la cual diferimos la ejecución de una expresión que produce un valor de Eval. Es como un flatMap pero seguro.

El ejemplo anterior utilizando la función defer queda como sigue:

def factorial2(n: BigInt): Eval[BigInt] ={
  if(n==1){
    Eval.now(n)
  }else{
    Eval.defer( factorial2(n-1).map( _ * n ) )
  }
}
println( factorial2(50000).value )

La salida por consola sería el resultado del cálculo. Al ser un número muy grande, lo omito.

4.- Función fold con Eval

Para realizar el cálculo de un valor de un ADT es común la utilización de la función fold. En el siguiente ejemplo, se muestra la definición de la función fold y el cálculo de la suma de los elementos de una lista. El código es el siguiente:

def miFoldRightEvalList[A, B](list: List[A], empty: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = list match {
  case Nil => empty
  case head :: tail => Eval.defer( f(head, miFoldRightEvalList(tail, empty)(f) ) )
}

def mifoldRight[A,B](as: List[A], acc:B)(fn: (A,B) => B): B =
  miFoldRightEvalList(as, Eval.now(acc)){
    (a,b) => b.map( fn(a,_) )
  }.value

val miLista2 =  (1 to 10000000).toList
println(s"Suma de la lista= ${mifoldRight( miLista2, 0L)(_+_) } ")
println

La salida por consola es la siguiente:

Suma de la lista= 50000005000000

La mónada Eval nos permite tener una seguridad en la ejecución de una cadena de expresiones evitando el desbordamiento de la pila del sistema.

Patrón Type Class y Spark

En las pasadas entradas centradas en el patrón type class con título Patrón Type Class  y Patrón Type Class: definición de leyes y test, realicé una descripción de la estructura de dicho patrón. En la presente entrada, Patrón Type Class y Spark, me centraré en la definición de un API mediante el patrón type class utilizando Apache Spark.

Apache Spark es aquel motor de procesamiento y análisis de datos. Apache Spark es una solución muy utilizada en el ámbito del Big Data.

El problema del ejercicio solucionada con Apache Spark consiste en extraer los datos existentes en unas tablas de una base de datos, crear un fichero en formato parquet de los datos; y, dicho fichero, dejarlo en una estructura de directorios. Todo ello, lo más configurable posible.

La definición de las tablas fuentes de datos son tablas, con nombre y campos con un enfoque didáctico, es decir, son tablas con nombre no relevantes. Definiré dos tablas: la tabla libros, con los siguientes campos: codigo, nombre y población; y, la tabla dat_country, con los mismos campos.

El tipo básico del ejemplo es aquel tipo que puede trabajar con el valor parametrizado cuya definición es la siguiente: type Postgresql[T] = T. El tipo lo he identificado con Postgres porque es con la base de datos con la cual realizaré el ejercicio.

La definición del fichero sbt del ejecercicio es la siguiente:

import sbt.Keys.libraryDependencies
name := "EjemploTypeClassSpark"
version := "1.0"
scalaVersion := "2.11.9"
scalacOptions += "-Ypartial-unification" // 2.11.9+
val spark_version = "2.3.1" 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % spark_version,
  "org.apache.spark" %% "spark-sql" % spark_version,
  "com.typesafe" % "config" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

La definición del API con nombre Engine estará formada por una única función con nombre dataExtraction la cual realizará la extración de datos, creación de un fichero en formato parquet y escritura en un path sin datos de retorno, es decir, de tipo Unit. Así, la definición de la parte funcional del type class es la siguiente:

import java.util.{ Properties}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import conf.Configuration
import typeclass.DTO.DataConfiguration
import util.Util
trait Engine[P[_]] {
  def dataExtraction(): P[Unit]
}
object Engine extends EngineInstances with EngineSyntax
trait EngineSyntax{
  object syntax{
    def |-> [P[_]]()(implicit BDI:Engine[P]): P[Unit] = BDI.dataExtraction()
  }
}

En el snippet anterior, definimos los siguientes elementos: el trait Engine del API, la definición del objeto Engine que hereda de las instancias y se comporta como un elemento como EngineSyntax. En este ejercicio, por simplificar, se omite la definición del trait con la definición del lenguaje y las leyes matemáticas que debe de cumplir.

La definición de las instancias con los efectos de lado es la siguiente:

trait EngineInstances{
  def apply[P[_]](implicit BDI:Engine[P]): Engine[P] = BDI
  import typeclass.Types.Postgresql
  implicit object EnginePostgress extends Engine[Postgresql]{
  def logger = LoggerFactory.getLogger(this.getClass)
  def loadTableField(nombreTable:String): List[String] = {
    nombreTable match {
       case "pruebas.libros" => List("codigo","nombre","poblacion")
       case "pruebas.dat_country" => List("codigo","nombre","poblacion")
       case _ => List()
   }
 }
def loadTableToParquet( spark:SparkSession, dataConfiguration:DataConfiguration ): Unit = {
  val engineSpark = spark
    .read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", dataConfiguration.url)
    .option("dbtable", dataConfiguration.nameTable)
    .option("user", dataConfiguration.user )
    .option("password", dataConfiguration.password)
    .option("fetchsize", "1000")
    .load()
    val parameter: List[String] = loadTableField(dataConfiguration.nameTable) //: _*
    engineSpark.select( parameter.toSeq.head, parameter.tail.toSeq: _* ).write.format("parquet").save( Util.getFilePathTarget(dataConfiguration.nameBucket ,dataConfiguration.nameTable)   )
}
def runSpark(url:String, user:String, password:String, numTables:Int, prop:Properties): Either[String, Unit] = {
  val spark = SparkSession.builder.appName("EjemploTypeClassSpark").getOrCreate()
  for(num <- 1 to numTables){
    try{
      val nameTable = prop.getProperty("table_" + num)
      if(!nameTable.equals(null) && !nameTable.equals("")){
        logger.info(s"[*****] Número tabla: ${num}, nombre tabla: ${nameTable}.")
        loadTableToParquet(spark, DataConfiguration(numTables=num, nameTable=nameTable, url=url, user=user, password=password, nameBucket = prop.getProperty("bucketName")) )
      }else{
        logger.info(s"[*****] Número tabla: table_${num} VACÍA")
      }
   } catch {
     case ex: Exception => {
       logger.info(s"[*****] Error en la carga de la tabla con número table_${num}")
       logger.error(s"Exception: ${ex.getMessage}")
     }
   }
 }
 spark.close()
 Right(Unit)
}
override def dataExtraction(): Postgresql[Unit] = {
  for{
    environment <- Configuration.loadEnvironmentVariables.right
    properties <- Configuration.loadProperties(environment.configuration).right
    _ <- runSpark(environment.url, environment.user , environment.password, properties.numTables, properties.properties ).right
  }yield{
    Right(Unit)
  }
 }
}
[...]
}

En el snippet anterior, definimos un trait EngineInstances con un constructor y un objeto implícito EnginePostgress el cual hereda del API Engine. En este trait, si definimos otro tipo de parámetro, como por ejemplo:Either[String,T] definido como type Oracle[T] = Either[String,T], sería el lugar para su implementación.

El objeto EnginePostgress define la función dataExtraction la cual define el programa con las siguientes sentencias: primero, larga de variables de entorno; segunda, carga de la configuración de los ficheros de properties; tercero, las operaciones con Spark; y, por último, el retorno de un elemento de tipo Unit. Las tres funciones son funciones que retornan un contenedor binario de tipo Either.

La función runSpark es aquella función que realiza la constructuctión de una sesión de Spark para realizar una operación sobre una tabla configurada; dicha función, se realiza en la función loadTableToParquet.

La función loadTableToParquet es aquella función que realiza la carga de la configuración del motor Spark para una tabla de una base de datos determinada, extrae los datos en formato parquet y deja el fichero en una ubicación determinada.

La aplicación cliente del API Engine es la siguiente:

import typeclass.Engine.syntax._
import typeclass.Types.Postgresql
object App extends App{
  val startTime: Long = System.currentTimeMillis()
  |->[Postgresql]()
}

El fichero properties con la configuración con el nombre de las tablas, el número de tablas a extraer y el path con el directorio en donde se almacena el resultado es el siguiente:

bucketName=~/tmp/
num_tables=2
table_1=pruebas.libros
table_2=pruebas.dat_country

La variables de entorno a configurar para la ejecución de la aplicación son las siguientes:

export url=jdbc:postgresql://localhost:5432/prueba
export user=postgres
export password=password
export configuration=~/workspace/EjemploTypeClassSpark/src/main/resources/configuration_bdi.properties

Los componentes software para la carga de las variables de entorno y carga de ficheros properties nos las describo en la entrada para reducir el tamaño y por centrar el ejemplo en el patrón type class.

Para finalizar y poder ejecutar la aplicación en local con Apache Spark se ejecuta con el siguiente comando:

cd ~/scala/spark-2.3.1-bin-hadoop2.7/bin
spark-submit --driver-class-path postgresql-42.1.4.jre6.jar --class "App" --master local[2] ~/workspace/EjemploTypeClassSpark/target/scala-2.11/ejemplotypeclassspark_2.11-1.0.jar

El resultado de la ejecución es la creación del fichero con los datos extraídos en la carpeta ~/tmp/ del sistema de directorios.

Patrón Type Class: definición de leyes y test

En la entrada anterior, Patrón Type Class , realicé la definición, descripción y mostré ejemplos del patrón type class. La estructura del patrón está compuesta por un conjunto de elementos trait y un objeto que se comporta como dichos trait. A este objeto, para ciertos tipos de datos, se pueden definir leyes matemáticas para poder realizar test de dicho componente.

En la presente entrada, Patrón Type Class: definición de leyes y test , realizaré la definición del type class monoiede para la operación lógica suma y producto. Para ello, definiré un Type Class con la estructura definida en la anterior entrada añadiendo la definición de las leyes.

Definición de Monoide

En la serie que llevo publicado de la librería Scalaz, publiqué un post con título Scalaz IV: Tipos etiquetados, propiedad asociativa y monoides , en donde se describe el concepto de Monoide, así como, la descripción de unos ejemplos.

En la programación funcional, aparecen escenarios en donde es necesario definir funciones binarias cuyos parámetros de entrada y de salida son del mismo tipo; este tipo de función, se define en la entidad Semigrupos. Desde un punto de vista del lenguaje Scala, un Semigrupo se define de la siguiente forma:

trait Semigroup[A]{
  def combine(x:A, y:A):A
}

El Semigrupo lo definimos con un trait que recibe un tipo parametrizado A y define una función combine cuyos dos parámetros de entrada y la salida son del mismo tipo.

Unos ejemplo matemáticos que representan el Semigroup[A] pueden ser los siguientes:

  1.  1 + 2
  2.  2 + 1
  3. 1 + (2 + 3)
  4. (1 + 2) + 3
  5. (1 * 2) * 3
  6. 1 * (2 * 3)

Como deducimos de los ejemplos anteriores: podemos decir que se cumple la propiedad asociativa, comparando los resultados de los puntos de los ejemplo 1-2, 2-3 y 5-6. Así, podemos afirmar que Semigroup[A] cumple la propiedad asociativa; pero, en función de la operación que se aplique, puede no cumplir esta regla; por ejemplo, la operación resta, no cumple la propiedad asociativa. Unos ejemplos pueden ser los siguientes:

  1. 1 – (2 – 3)
  2. (1 – 2) – 3

De los ejemplos anteriores de la operación resta, el resultado de las operaciones es distinto en los ejemplos del punto 1 y 2.

Las operaciones matemáticas pueden cumplir otras propiedades; como por ejemplo, la propiedad de identidad. La propiedad de identidad necesita un valor vacío o valor zero el cual, en función de la operación, el valor del elemento no varía; por ejemplo: en la operación de suma, el valor zero o vacío es el valor 0; y, para la operación de multiplicación, el valor zero o vacío es el valor 1. Unos ejemplos con el entero 2 y aplicando el elemento vacío, cuyo resultado no cambia el entero, son los siguientes:

  1. Operación suma por la izquierda: 2 + 0 = 2
  2. Operación suma por la derecha: 0 + 2 = 2
  3. Operación multiplicación por la izquierda: 2 * 1 = 2
  4. Operación multiplicación por la derecha: 1 * 2 = 2

Llegado a este punto, los escenarios de conjuntos de elementos en donde se definen unas funciones que cumple las propiedades de asociatividad e identidad, son definidos como Monoides. La definición de Monoide en lenguaje Scala es la siguiente:

trait Monoid[A] extends Semigroup[A]{
  def empty: A
}

En los siguientes apartados, realizaré la descripción de monoides para las funciones lógicas suma (OR) y multiplicación (AND).

Monoide: operación lógica suma (OR)

La definición del Type class de la operación lógica suma (OR) en lenguaje Scala es la siguiente:

trait MonoidSuma[A] extends Monoid[A]{}
object MonoidSuma extends MonoidSumaInstances with MonoidSumaSyntax with MonoidSumaLaws
trait MonoidSumaInstances{
  def apply[A](implicit monoid: MonoidSuma[A]) = monoid
  implicit val monoidBooleanSuma = new MonoidSuma[Boolean] {
    // 1-FORMA
    // override def combine(x: Boolean, y: Boolean): Boolean = (x, y) match{
    // case (true, true ) => true
    // case (true, false) => true
    // case (false, true) => true
    // case (false, false) => false
    // }
    override def combine(x: Boolean, y: Boolean): Boolean = x || y
    override def empty: Boolean = false
  }
}
trait MonoidSumaSyntax{
  object syntax{
    def ++++[A](a:A, b:A)(implicit monoide: MonoidSuma[A]) = monoide.combine(a,b)
    def emptySuma [A](implicit monoide: MonoidSuma[A]) = monoide.empty
  }
}
trait MonoidSumaLaws{
  import MonoidSuma.syntax._
  trait Laws[A]{
    implicit val instance: MonoidSuma[A]
    def asociatividad(a1:A, a2:A, a3:A): Boolean = ++++( ++++(a1,a2), a3) == ++++(a1, ++++(a2,a3) )
    def izquierdaIdentidad(a1:A): Boolean = ++++( a1, emptySuma ) == a1
    def derechaIdentidad(a1:A): Boolean = ++++( emptySuma, a1 ) == a1
  }
  object Laws{
    def apply[A](implicit monoide:MonoidSuma[A]) = new Laws[A] {
      implicit val instance: MonoidSuma[A] = monoide // Dfinición de la referencia del trait.
    }
  }
}

La estructura del type class es la siguiente: trait MonoidSuma, objeto MonoidSuma, trait MonoidSumaInstances, trait MonoidSumaSyntax y MonoidSumaLaws. De la estructura de type class descrita en la entrada anterior, aparece el elemento nuevo trait MonoidSumaLaws en donde se define las funciones con las propiedades matemáticas de asociatividad y de identidad, así como, el objeto con el constructor del monoide.

Para realizar las pruebas del type class de la operación suma es necesario probar las leyes del type class y, para realizar las pruebas, se definen unos test que verifican las leyes matemáticas del type class los cuáles, para el type class función suma lógica, es el siguiente:

class MyMonoidTest extends FlatSpec with Matchers{
  "Test de las leyes del Monoide lógico Suma" should "cumple las leyes asociativas y de identidad" in {
    import es.ams.cap2monoidsemigroup.MonoidSuma.Laws
    val laws = Laws.apply
    assert( laws.asociatividad( true, false, true) == true)
    assert( laws.izquierdaIdentidad(true) == true )
    assert( laws.derechaIdentidad(true) == true )
  }
}

Monoide: operación lógica producto (AND)

La definición del Type class de la operación lógica producto es la siguiente:

trait MonoidProducto[A] extends Monoid[A]{}
object MonoidProducto extends MonoidProductoInstances with MonoidProductoSyntax with MonoidProductoLaws
trait MonoidProductoInstances{
  def apply[A](implicit monoid: MonoidProducto[A]) = monoid
  implicit val monoidProductoSuma = new MonoidProducto[Boolean] {
    // 1-FORMA
    // override def combine(x: Boolean, y: Boolean): Boolean = (x, y) match{
    // case (true, true ) => true
    // case (true, false) => false
    // case (false, true) => false
    // case (false, false) => false
    // }
    override def combine(x: Boolean, y: Boolean): Boolean = x && y
    override def empty: Boolean = true
  }
}
trait MonoidProductoSyntax{
  object syntax{
    def ****[A](a:A, b:A)(implicit monoide: MonoidProducto[A]) = monoide.combine(a,b)
    def emptyProducto [A](implicit monoide: MonoidProducto[A]) = monoide.empty
  }
}
trait MonoidProductoLaws{
  import MonoidProducto.syntax._
  trait Laws[A]{
    implicit val instance : MonoidProducto[A]
    def asociatividad(a1:A, a2:A, a3:A):Boolean = ****( ****(a1, a2), a3) == ****( a2, ****(a2, a3))
    def izquierdaIdentidad(a1:A):Boolean = ****(a1, emptyProducto) == a1
    def derechaIdentidad(a1:A):Boolean = ****(emptyProducto, a1) == a1
  }
  object Laws{
    def apply[A](implicit monoide:MonoidProducto[A]) = new Laws[A]{
      implicit val instance: MonoidProducto[A] = monoide
    }
  }
}

La estructura del type class es la siguiente: trait MonoidProducto, objeto MonoidProducto, trait MonoidProductoSyntax, trait MonoidProductoLaws y MonoidProductoLaws. De la estructura de type class descrita en la entrada anterior, aparece el elemento nuevo trait MonoidProductoLaws en donde se definen las funciones con las propiedades matemáticas de asociatividad y de identidad, así como, el objeto con el constructor del monoide.

Para realizar las pruebas del type class es necesario probar las leyes y, para realizar las pruebas, se define un test que verifican las leyes matemáticas del type class los cuáles, para el type class función producto lógica, es el siguiente:

class MyMonoidTest extends FlatSpec with Matchers{
  "Test de las leyes del Monoide lógico Producto" should "cumple las leyes asociativas y de identidad" in {
    import es.ams.cap2monoidsemigroup.MonoidProducto.Laws
    val laws = Laws.apply
    assert( laws.asociatividad( true, false, true) == true)
    assert( laws.izquierdaIdentidad(true) == true )
    assert( laws.derechaIdentidad(true) == true )
  }
}

La definición de las leyes se realiza utilizando las funciones definidas en la sintaxis y referenciando a los elementos implícitos de las instancias. ScalaTest es el framework seleccionado para la definición y ejecución de los test definidos de los type class.