Mosquitto

Mosquitto es un broker de mensajería Open Source(licencia EPL/EDL) que implementa el protocolo MQTT para las versiones 3.1 y 3.1.1. Mosquitto es parte de Eclipse foundation y es un proyecto de Eclipse. Mosquitto es ligero y es adecuado para todos los dispositivos, desde computadoras de baja potencia hasta servidores complejos.

El protocolo MQTT (MQ Telemetry Transport or Message Queuing Telemetry Transport) es un estándar ISO basado en un método sencillo para realizar intercambio de mensajes conforme el modelo productor-consumidor en sistemas de Internet de las cosas ( IoT – Internet of the Thinks). Los dispositivos que se conectan pueden ser sensores, dispositivos móviles, microcontroladores o sistemas embebidos.

El sketchnote de la entrada es el definido en la siguiente imagen:

Instalación

La descripción del proceso de instalación está basada en plataformas Linux; pero, para el resto de plataformas (Windows, Mac,…), el proceso de instalación es el típico para dichas plataformas. La información de instalación para cada plataforma, se encuentra en el apartado Download de la documentación oficial.

Los comandos a ejecutar para la instalación de Mosquitto son los siguientes:

sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
sudo apt-get update
sudo apt-get install python-software-properties
sudo apt-get install mosquitto

Estructura de carpetas

El proceso de instalación nos creará una estructura de directorios como la siguiente:

  • /etc/mosquitto.- Carpeta con los ficheros de configuración del broker. El contenido del directorio es el siguiente:

-> directorio ca_certificates.-Directorio de certificados

-> certs.- Directorio de certificados

-> conf.d .- Directorio de configuración

-> mosquitto.conf .- Fichero de configuración

  • /usr/sbin .- En esta carpeta se localiza el fichero de arranque mosquitto.sh
  • /var/log/mosquitto .- Directorio en el que se encuentra el fichero mosquitto.log

Inicio del broker

Para iniciar el broker de mensajería Mosquitto es necesario la ejecución del comando mosquitto identificando la ubicación del fichero de configuración. El comando de ejecución del broker es el siguiente:

sudo /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf

El flag -c indica al comando mosquitto la ubicación del fichero de configuración mosquitto.conf. Si se desea conocer la información del comando, es decir, la ayuda del comando, se puede ejecutar el comando:

/usr/sbin/mosquitto -h

El resultado de la ejecución del comando, será que el broker Mosquitto está ejecutándose en la máquina en el puerto por defecto 1883.

Cliente del Broker

Una vez instalado e iniciado el broker es necesario definir el mecanismo por el cual poder conectarse y realizar una comunicación. Para realizar dichas operaciones, es necesario la instalación de un cliente específico MQTT; dicho cliente, servirá para la realización de simulaciones de dispositivos MQTT. El nombre del dispositivo es mosquitto-clients y, el comando de instalación en la plataforma Linux, es el siguiente:

sudo apt-get install mosquitto-clients

El cliente instalado es aquel cliente que permite simular el funcionamiento de los productores, entidades que generan información MQTT; y, consumidores, entidades que reciben los datos suministrados por los productores.

Para el broker Mosquitto, los productores de datos se denominan publicadores y, los que consumen los datos, subscriptores.

Topic

Llegado a este punto, disponemos del broker MQTT instalado y ejecutándose; tenemos instalado el cliente para definir los publicadores y los subscriptores; pero, no tenemos definido el mecanismos de interconexión entre los clientes. El mecanismo en cuestión son los topics. Los topics son aquellos buzones en los cuales los publicadores dejan sus mensajes. Así, un publicador dejan en un topic aquel mensaje que quiere transmitir al subscritor; y, el subscriptor, en el instante de la publicación de un mensaje, recibe dicho mensaje.

Como he comentado antes, los dispositivos que se pueden conectar son múltiples: sensores, sistemas embedidos, dispositivos móviles; con lo cual, es necesario una organización de todos los elememtos. Así, para el supuesto de un sistema IoT de los sensores de una casa, podemos definir los dispositivos en una estructura jerárquica de la siguiente forma:

casa

habitaciones

hab1

luz: sensor de luz

temperatura: sensor de temperatura

hab2

luz: sensor de luz

temperatura: sensor de temperatura

trastero

luz: sensor de luz

temperatura: sensor de temperatura

humedad: sensor de humedad

Dada la estructura anterior, definimos los siguientes topic como ejemplos en el broker de la siguiente manera:

  • casa/habitaciones/hab1/luz.- Topic del sensor de luz de la habitación 1
  • casa/habitaciones/hab2/luz.- Topic del sensor de luz de la habitación 2
  • casa/trastero/humedad.- Topic del sensor de humedad del Trastero.

Ejemplos

Los clientes del broker MQTT Mosquitto pueden ser: subscriptores, entidades que reciben datos de sensores; y, publicadores, los sensores que envían datos al broker. Para realizar la simulación desde consola, utilizamos los comandos siguientes comandos: mosquitto_sub, comando para la simulación de subscriptores; y, mosquitto_pub, comando para la simulación de sensores.

En los siguientes apartados, describiré la ejecución de cada uno de los comados.

Subscriptor

El comando para la simulación de subscriptores es el comando mosquitto_sub. Un ejemplo básico de subscripción en el topic «casa/habitaciones/hab1/luz» es el siguiente:

mosquitto_sub -t "casa/habitaciones/hab1/luz" -v

El flag -t identifica el topic de subscripción y, el flag -v, indica que sea verboso. Para visualizar la ayuda del comando se ejecuta el comando con los flag –help.

Publicador

El comando para la simulación de publicadores de mensajes es el comando mosquitto_pub. En ejemplo básico de publicación del mensaje «ON» en el topic «casa/habitaciones/hab1/luz» es el siguiente:

mosquitto_pub -t "casa/habitaciones/hab1/luz" -m "ON"

El flag -t identifica el topic de subscripción y, el flag -m, identifica el mensaje a enviar al broker.Para visualizar la ayuda del comando se ejecuta el comando con los flag –help.

La publicación del mensaje «ON» produce la salida por consola de subscriptor. Así, la salida de la consola del subscriptor es la siguiente:

casa/habitaciones/hab1/luz ON

Conclusiones

El broker MQTT Mosquitto es un broker ligero y sencillo de utilizar. En la presente entrada, me he centrado en la instalación y casos de uso básicos, no he tratado en esta entrada los aspectos orientados a la seguridad los cuáles en los sistemas IoT son importantes.

Desde un punto de vista genérico, el funcionamiento de Mosquitto es el típico de un broker que implementa un modelo productor/consumidor.

En la siguiente entrada, me centraré en la utilización del broker Mosquitto desde una aplicación en Python con el cliente paho-mqtt.

Circe IV: ópticas

Finalizo la serie de Circe con la presente entrada, Circe IV: ópticas, en la cual me centraré en cómo Circe emplea componentes ópticos para facilitar su funcionalidad. Circe no implementa ópticas, sino que se ayuda de la librería Monocle para implementar esta funcionalidad.

 

El sketchnote de la presente entrada, queda descrito en la siguiente imagen:

Para el lector interesado en la librería óptica Monocle, puede acceder a los siguientes enlaces de la librería Monocle que tengo publicados:

El primer paso a realizar es definir una estructura JSON de prueba con lo cual realizar la comparativa entre el modo de trabajo sin ópticas y con ópticas. El JSON de pruebas es el siguiente:

 import cats.syntax.either._
 import io.circe._
 import io.circe.parser._
 val json: Json = parse(
 """
 {
   "order": {
   "customer": {
   "name": "Custy McCustomer",
   "contactDetails": {
     "address": "1 Fake Street, London, England",
     "phone": "0123-456-789"
   }
 },
 "items": [{
   "id": 123,
   "description": "banana",
   "quantity": 10
  }, {
      "id": 456,
      "description": "apple",
      "quantity": 20
    }],
    "total": 123.45
  }
 }
 """).getOrElse(Json.Null)

Acceso a datos en Circe sin ópticas

Como he descrito en las entradas anteriores al tema, el acceso a los datos se realiza con un cursor, la función downField y get entre otros. Para refrescar los conceptos, en el siguiente snippet se definen dos ejemplos: el primero, acceso al campo Phone del JSON de prueba; y, el segundo, acceso a los valores de un array del JSON de prueba. El código es el siguiente:

 println(s"[*] Número de teléfono del cliente(NO ÓPTICA): 
     ${json.hcursor.downField("order")
        .downField("customer")
        .downField("contactDetails")
        .get[String]("phone").toOption}")
 val items: Vector[Json] = json.hcursor.downField("order").downField("items").
 focus.flatMap(_.asArray).
 getOrElse(Vector.empty)
 val quantities: Vector[Int] = items.flatMap( _.hcursor.get[Int]("quantity").toOption )
 println(s"[*] Obtención de un Array del JSON=${quantities} ")

La salida por consola es la siguiente:

 [*] Número de teléfono del cliente(NO ÓPTICA): Some(0123-456-789)
 [*] Obtención de un Array del JSON=Vector(10, 20)

Acceso a datos en Circe con ópticas

La utilización de ópticas supone la definición de un regla de acceso para cada campo. Así, tenemos que definir para cada campo una óptica. Para los ejemplos del apartado anterior, definimos las ópticas para el campo phone y el array quantity de la siguiente forma:

 import io.circe.optics.JsonPath._
 val _phoneNum = root.order.customer.contactDetails.phone.string
 println(s"Número de teléfono del cliente (ÓPTICA): ${_phoneNum.getOption(json)}")
 val items: List[Int] = root.order.items.each.quantity.int.getAll(json)
 println(s"Número de teléfono del cliente (ÓPTICA): ${items}")

La salida por consola es la siguiente:

 Número de teléfono del cliente (ÓPTICA): Some(0123-456-789)
 Número de teléfono del cliente (ÓPTICA): List(10, 20)

Modificación de datos en Circe con ópticas

Para realizar la moficación de un campo, se emplea la función modify de la óptica de aquel campo a modificar. Para realizar la modificación del campo quantity, se realiza de la siguiente forma:

 import io.circe.optics.JsonPath._
 import io.circe._
 val doubleQuantities: Json => Json = root.order.items.each.quantity.int.modify(_ * 2)
 println(s"Multiplicación del campo quantity x2= ${doubleQuantities(json)} ")

La salida por consola es la siguiente:

 Multiplicación del campo quantity x2= {
   "order" : {
   "customer" : {
   "name" : "Custy McCustomer",
   "contactDetails" : {
     "address" : "1 Fake Street, London, England",
     "phone" : "0123-456-789"
   }
 },
  "items" : [
   {
     "id" : 123,
     "description" : "banana",
     "quantity" : 20
   },
   {
     "id" : 456,
     "description" : "apple",
     "quantity" : 40
   }
  ],
   "total" : 123.45
  }
 }

Llegado a este punto, podemos llegar a la conclusión final que la operativa con estructuras JSON con la librería Circe es una tarea sencilla y de fácil aprendizaje: el acceso, modificación y transformación de JSON a una case class o viceversa, son tareas con una pequeña complejidad. Además, si se conoce el funcionamiento de las librerías ópticas, el acceso y manipulación de JSON es más sencillo aún.

Para el lector interesado, el conjunto de las entradas de la librería Circe son las siguientes:

Luigi

En el mundo Big Data y BI, los ETL son unas de las operaciones iniciales para preparar los datos para las tareas de inteligencia; con dichos datos, una vez cargados, se realizan las operaciones para la ayuda en la toma de decisiones. En la presente entrada, «Luigi», defino qué es un ETL y presentar una solución para realizar ETL`s con la solución Luigi.

Definimos un ETL, según la definición en wikipedia, como aquel proceso de extracción, transformación y carga de datos de una fuente de datos a una base da datos, data mart o data warehouse.

La solución tecnológica para la implementación de ETL que describiré en la presente entrada es Luigi  la cual pertenece al contexto del lenguaje Python. Luigi es un paquete de Python el cual permite la construcción de trabajos batch complejos, visualización del estado de procesamiento y encadenamientos de tareas. Luigi es una solución que ha sido adaptada por empresas como Spotify o Red Hat.

El scketchnote de la entrada es el representado en la siguiente imagen:

Instalación

El proceso de instalación de del paquete Luigi es el típico de instalación de un paquete en Python mediante la herramienta pip. Así, el comando de instalación es el siguiente:

pip install luigi

Elementos

Los elementos básicos que intervienen en la definición de un flujo de proceso en Luigi son tres: target, corresponde a una fuente de entrada o salida de datos; un task, tarea de procesamiento; y, parameter, correspondiente a los parámetros de intercambio entre las distintas tareas del flujo de trabajo.

Target

Un Target es aquella clase abstracta que implementa un punto de entrada o salida del flujo; por ejemplo, puede ser: un fichero, una base de datos, un bucket en AWS S3,…

Un Target está compuesta por una jerarquía de clases, como por ejemplo: LocalTarget, S3Target, MySqlTarget, RedshiftTarget; en definitiva, son clases específicas para aquella fuente de datos con las que un flujo trabaja.

Task

Un Task es aquella unidad computacional que realiza una operación con unos datos determinados. Task es una clase abstracta cuyas clases hijas deben de implementar unos métodos básicos: require, para definir las dependencias con otras tareas; output, para definir el resultado del procesamiento; y, por último, run, para definir el procesamiento propiamente dicho.

Parameter

Un Parameter son aquellos elementos de entrada de una tarea. Un Parameter, de la misma forma que los elementos anteriores, es una clase abstracta.

Ejemplos

Operación Suma

La operación de suma consiste en definir una tarea que recibiendo dos parámetros enteros como entrada realice la suma de los mismos. Para realizarlo, definiremos un fichero Python tasksuma.py con el siguiente contenido:

import luigi
class Sumatorio(luigi.Task):
  x = luigi.IntParameter(default=10)
  y = luigi.IntParameter(default=45)

 def run(self):
     print("[***]")
     print("[***] Resultado de la suma:", self.x + self.y)
     print("[***]")

El contenido consiste en la definición de una clase Sumatorio que hereda de la clase Task. La clase Sumatorio recibe dos parámetros enteros; dichos parámetros, tienen unos valores por defecto en el caso de que no existan los valores de entrada. El método run realiza la operación de suma y la escritura por consola.

Para la ejecución de la tarea, en la carpeta raíz del proyecto Python, ejecutamos el siguiente comando:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --x 69 --y 10 --local-scheduler

La salida por consola contiene el resultado de la ejecución de la tarea y las trazas del procesamiento. Así, sin entrar en detalle de todos los mensajes, la salida es la siguiente:

INFO: Informed scheduler that task Sumatorio_69_10_0543bfea4e has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) running Sumatorio(x=69, y=10)
[***]
[***] Resultado de la suma: 79
[***]
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) done Sumatorio(x=69, y=10)
[...]
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
 - 1 Sumatorio(x=69, y=10)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Si no pasamos los valores enteros por parámetro a la tarea, el comando de ejecución sería el siguiente:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --local-scheduler

La salida por consola sería parecida a la anterior, residiendo la diferencia en los valores numéricos de los campos que serían los valores por defecto.

Operaciones anidadas

Todo proceso de transformación requiere la realización de varias actividades y, en muchas ocasiones, esas actividades tienen que ser realizadas por tareas encadenadas. En el siguiente ejemplo, se realizan dos tareas encadenadas definidas en el fichero Python task_anidadas.py

La primera tarea, clase GenerateWords, realiza la creación de un fichero palabras.txt con el contenido de una lista de palabras; y, la segunda tarea, clase CountLetters, realiza la lectura del fichero generado en la tarea anterior, contabiliza la lectura longitud de las palabras y, como resultado, crea un fichero con la palabra y su longitud.

La clase GenerateWords se define de la siguiente forma:

class GenerateWords(luigi.Task):
 def output(self):
    return luigi.LocalTarget('/tmp/LuigiEjem1/palabras.txt')

 def run(self):
    palabras = ['manzana', 'plátano', 'mandarina']
    with self.output().open('w') as f:
      for palabra in palabras:
      f.write('{word}\n'.format(word=palabra))

La clase GenerateWords está compuesta de dos métodos: el método output, en el cual se define el target de salida, es decir, se define el fichero de salida del procesamiento; y, en el método run, se realiza la escritura en el fichero definido en el método output el contenido de la lista palabras.

La clase CountLetters se define de la siguiente forma:

class CountLetters(luigi.Task):
 def requires(self):
   return GenerateWords()

 def output(self):
   return luigi.LocalTarget('/tmp/LuigiEjem1/contador_letras.txt')

 def run(self):
   print("[***]")
   print("[***]self.input().open('r')=", self.input().open('r'))
   print("[***]")
   with self.input().open('r') as ficheroEntrada:
   palabras = ficheroEntrada.read().splitlines()
   with self.output().open('w') as ficheroSalida:
      for palabra in palabras:
      ficheroSalida.write(
       '{word} | {letter_count}\n'.format(
       word=palabra,
       letter_count=len(palabra)
      )
     )

La clase CountLetters está compuesta de tres métodos: requires, método en el que se define la dependencia con la tarea definida en la clase GenerateWords; método output, método en el que se define el target de salida, es decir, el fichero contador_letras; y, finalmente, método run, método que realiza la lectura del fichero generado por la tarea GenerateWord, contabilización de las longitudes de las palabras existentes y escritura en el fichero de salida contador_letras.txt

Para la ejecución del proceso definido por las anteriores tareas, se ejecuta el siguiente comando en la carpeta raíz del proyecto:

PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler

El resultado es la creación en la carpeta ‘/tmp/LuigiEjem1’ de dos ficheros: palabras.txt y contador_letras.txt. El contenido del fichero palabras.txt es el siguiente:

manzana
plátano
mandarina

El contenido del fichero contador_letras.txt es el siguiente:

manzana | 7
plátano | 7
mandarina | 9

4.- Visualización

Para la visualización del estado de ejecución, Luigi posee una herramienta gráfica en la cual se muestra la información del estado de ejecución. Esta herramienta de visualización es un servidor que hay que arrancar; para ello, se emplea el siguiente comando:

luigid --background --port=8082 --logdir=logs

El aspecto del interfaz es el siguiente:

La información visual que muestra en el GUI es relacionada con las tareas que son ejecutas, el estado en el que se encuentra cada una de ellas y las dependencias gráficas entre cada una.

Para que el servidor monitorice el estado de ejecución de cada tarea, es necesario que en el arranque se especifique que el servidor controlará su ejecución. Así, para poderlo realizar y basándonos en los ejemplos anteriores, es necesario eliminar el flag –local-scheduler. Desde un punto de vista práctico, los escenarios son los siguientes:

  • Ejecución en local de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler
  • Ejecución en el servidor Luigi de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters

Conclusiones

Los ETL se basan en procesos de datos simples: lectura de fuente de datos, transformación y escritura en un repositorio de datos destinos. Dichos procesos, en función de la complejidad del negocio, su nivel de dificultad puede variar.

Luigi es una solución del ecosistema de soluciones del lenguaje Python; pero, mientras que aprendía Luigi, me he acortado por el parecido funcionamiento a SpringBatch, solución propuesta por Spring Framework en el ecosistema Java. Para el lector interesado, publiqué hace tiempo unas entradas sobre SpringBatch

Desde mi punto de vista, la dificultad no reside en la solución tecnológica, la dificultad reside en determinar qué datos son los que se tienen que cargar y qué transformaciones se tienen que aplicar a los datos.

Circe III: codificacores y decodificadores

Continúo con la serie de entradas de la serie sobre la librería Circe. En la presente entrada, Circe III: codificacores y decodificadores, me centraré en los codificadores y decodificadores de Circe los cuáles se definen como type classes.

El sketchnote de la presente entrada, queda descrito en la siguiente imagen:

Un codificador, Encoder[A], contiene una función que convierte un elemento de tipo A en un JSON; y, un decodificador, Decoder[A], realiza la función inversa de un JSON de un tipo A. La librería Circe contiene instancias implícitas de estas type classes para muchos tipos de scala como Int, String, List[A], Option[A] y otros.

Operaciones básicas

Para realizar las operaciones básicas es necesario importar los elementos de los siguientes paquetes:

 import io.circe.parser.decode
 import io.circe.syntax._
  • Para codificar una lista en JSON podemos utilizar la función asJson, o bien, as[TipoDato]. Unos ejemplos ilustrativos son los siguientes:
 val intsJson = List(1, 2, 3).asJson
 println(s"[-] Codificación usando la sintaxis: List->JSON=${List(1, 2, 3).asJson}")
 println(s"[-] Decodificación usando la sintaxis de Json->List[Int]=${intsJson.as[List[Int]]}")

La salida por consola es la siguiente:

 [-] Codificación usando la sintaxis: List->JSON=[
   1,
   2,
   3
 ]
 [-] Decodificación usando la sintaxis de Json->List[Int]=Right(List(1, 2, 3))
  • Para decodificar una estructura JSON a un tipo determinado, se emplea la función decode. Unos ejemplos ilustrativos son los siguientes:
 println(s"[-] Decodificación String->List[Int]=${decode[List[Int]]("[1, 2, 3]")}")
 println(s"[-] Decodificación String->Seq[Int]=${decode[Seq[Int]]("[1, 2, 3]")}")
 println(s"[-] Decodificación String->List[Option[Int]]=${decode[List[Option[Int]]]("[1, 2, 3]")}")

La salida por consola es la siguiente:

 [-] Decodificación String->List[Int]=Right(List(1, 2, 3))
 [-] Decodificación String->Seq[Int]=Right(List(1, 2, 3))
 [-] Decodificación String->List[Option[Int]]=Right(List(Some(1), Some(2), Some(3)))

Operaciones semiautomáticas

En ciertos momentos es necesario tener definidos Encoder y Decoder para una case class determinada. Para ello, utilizamos los componentes semiautomáticos y los importamos de la siguiente manera:

 import io.circe._
 import io.circe.generic.semiauto._

Definimos una case class con nombre Foo de prueba como sigue:

 case class Foo(a: Int, b: String, c: Boolean)

Definimos los codificadores y decodificadores para la case class de forma implícita de la siguiente forma:

 implicit val fooDecoder: Decoder[Foo] = deriveDecoder[Foo]
 implicit val fooEncoder: Encoder[Foo] = deriveEncoder[Foo]

La operación de codificar la case class Foo a un JSON y su proceso contrario es el siguiente:

 println(s"[*] Encoder Foo->Json =${fooEncoder(Foo(a = 1, b = "b", c = true))}")
 val jsonFoo = fooEncoder(Foo(a = 2, b = "bbb", c = false))
 val cursor: HCursor = jsonFoo.hcursor
 println(s"[*] Decoder Json->Foo =${fooDecoder(cursor)}")

La salida por consola es la siguiente:

 [*] Encoder Foo->Json ={
   "a" : 1,
   "b" : "b",
   "c" : true
 }
 [*] Decoder Json->Foo =Right(Foo(2,bbb,false))

Operaciones con anotaciones

La librería Circe contiene unas anotaciones que permite realizar operaciones de codificación, permitiendo la codificación de un case class a JSON. La anotación se encuentra definida en el siguiente paquete io.circe.generic.JsonCodec. Así, un ejemplo de codificación de case class a JSON es el siguiente:

 import io.circe.syntax._
 import io.circe.generic.JsonCodec
 @JsonCodec case class Bar(i: Int, s: String)
 println(s"[-] Conversión Bar->JSON=${Bar(i = 1, s = "Prueba").asJson}")

La salida por consola es la siguiente:

 [-] Conversión Bar->JSON={
   "i" : 1,
   "s" : "Prueba"
 }
  • Para una relación de clases anidadas, la utilización de anotaciones permite una automatización sencilla y eficiente. Así, dadas las siguientes entidades:
 import io.circe.generic.JsonCodec
 @JsonCodec case class Person2(name: String)
 @JsonCodec case class Greeting2(salutation: String, person: Person2, exclamationMarks: Int)
  • La operación de conversión de una entidad a JSON, se realiza de la siguiente forma:
 println(s"[*] CLASE COMPLEJA -> JSON, usando @JsonCodec=${Greeting2("Hey", Person2("Chris"), 3).asJson}")

La salida por consola es la siguiente:

 [*] CLASE COMPLEJA -> JSON, usando @JsonCodec={
   "salutation" : "Hey",
   "person" : {
   "name" : "Chris"
  },
   "exclamationMarks" : 3
 }
  • La operación de conversión de JSON a la entidad, se realiza de la siguiente forma:
 val greetingJSON =
 """
  {
    "salutation" : "Hey",
    "person" : {
    "name" : "Chris"
  },
    "exclamationMarks" : 3
  }
 """
 import io.circe.parser.decode
 println(s"Decodificacion=${decode[Greeting2](greetingJSON)}")

La salida por consola es la siguiente:

Decodificacion=Right(Greeting2(Hey,Person2(Chris),3))

Helper method

En ciertos momentos es necesario tener un codificador y un decodificador para una determinada entidad de negocio. Para ello, definimos un objeto con métodos helper de forma implícita. Así, para la siguiente entidad User definimos la clase codificadora de la siguiente forma:

  import io.circe.{Decoder, Encoder}
  case class User(id: Long, firstName: String, lastName: String)
  object UserCodec {
    implicit val decodeUser: Decoder[User] =
      Decoder.forProduct3("id", "first_name", "last_name")(User.apply)
    implicit val encodeUser: Encoder[User] =
      Encoder.forProduct3("id", "first_name", "last_name")(u =>
        (u.id, u.firstName, u.lastName)
      )
  }

En este caso hemos usado la función forProduct3 porque la entidad User tiene tres campos; pero, existen funciones forProductN (siendo N un número natural salvo el cero) para las clases con distinto número de campos. Un ejemplo de utilización de los helper definidos en el objeto UserCodec es el siguiente:

  println(s"[--] Codificación User->JSON =${encodeUser(User(id = 1, firstName = "11", lastName = "111"))}")
  val jsonPrueba = encodeUser(User(id = 69, firstName = "Prueba Decodificación", lastName = "Prueba Decodificación"))
  val cursor: HCursor = jsonPrueba.hcursor
  println(s"[--] Decodificación JSON->User =${decodeUser(cursor)}")

La salida por consola es la siguiente:

 [--] Codificación User->JSON ={
   "id" : 1,
   "first_name" : "11",
   "last_name" : "111"
  }
 [--] Decodificación JSON->User =Right(User(69,Prueba Decodificación,Prueba Decodificación))

Codificadores y Decodificadores a medida

En Circe existe la posibilidad de definir codificadores y decodificadores a medida. Para esta operación, utilizamos las entidades Encoder y Decoder. Así, para una entidad ejemplo Thing, las clases codificadores y decodificadoras se definen de la siguiente manera:

 import io.circe.{ Decoder, Encoder, HCursor, Json }
 class Thing(val foo: String, val bar: Int)
 implicit val encodeFoo: Encoder[Thing] = new Encoder[Thing] {
   final def apply(a: Thing): Json = Json.obj(
     ("foo", Json.fromString(a.foo)),
     ("bar", Json.fromInt(a.bar))
   )
 }
 implicit val decodeFoo2: Decoder[Thing] = new Decoder[Thing] {
   final def apply(c: HCursor): Decoder.Result[Thing] =
   for {
     foo <- c.downField("foo").as[String].right
     bar <- c.downField("bar").as[Int].right
   } yield {
     new Thing(foo, bar)
   }
  }

Para realizar la operación de codificación de un objeto de la clase Thing a un JSON, se realiza de la siguiente manera:

 println(s"[A*] Codificador de una clase THING->JSON = ${encodeFoo(new Thing("PruebaCliente",12))} ")

La salida por consola es la siguiente:

 [A*] Codificador de una clase THING->JSON = {
   "foo" : "PruebaCliente",
   "bar" : 12
 }

Para acceder a los valores de los campos definidos en el JSON, se realiza empleando el campo downField. La obtención de los campos del JSON anterior, se realiza de la siguiente forma:

 val cursorPruebaCodificada: HCursor = pruebaCodificado.hcursor
 println(s"[A*] Campo bar = ${cursorPruebaCodificada.downField("bar").as[Int].right } ")
 println(s"[A*] Campo foo = ${cursorPruebaCodificada.downField("foo").as[String].right } ")
 println(s"[A*] Campo JSON = ${cursorPruebaCodificada.top } ")

La salida por consola es la siguiente:

 [A*] Campo bar = RightProjection(Right(12)) 
 [A*] Campo foo = RightProjection(Right(PruebaCliente)) 
 [A*] Campo JSON = Some({
   "foo" : "PruebaCliente",
   "bar" : 12
 })

Para realizar la operación de decodificar un JSON a un objeto Thing, se utiliza un Decoder de la siguiente forma:

println(s"[A*] Decodificador JSON -> Thing = ${decodeFoo2( cursorPruebaCodificada ).right.e.right.get } ")

La salida por consola es la siguiente:

[A*] Decodificador JSON -> Thing = Thing(PruebaCliente,12)

Codificadores de claves

La existencia y sencillez de uso de los codificadores y decodificadores, permite emplear su funcionalidad para otras tareas como las codificaciones de claves. Circe proporciona dos elementos para para la codificación y decodificación de claves, respectivamente KeyEncoder y KeyDecoder.

Sea una clave definida por una clase Foo como sigue:

case class Foo(value: String)

Definimos los codificadores y decodificadores implícitos de la siguiente manera:

 implicit val fooKeyEncoder: KeyEncoder[Foo] = new KeyEncoder[Foo] {
   override def apply(foo: Foo): String = foo.value
 }
 implicit val fooKeyDecoder: KeyDecoder[Foo] = new KeyDecoder[Foo] {
   override def apply(key: String): Option[Foo] = Some(Foo(key))
 }

Sea la siguiente estructura Map con un conjunto de claves y valores:

 val map = Map[Foo, Int](
   Foo("hola") -> 123,
   Foo("mundo") -> 456
 )

El proceso de conversión de la estructura Map en JSON, se realiza de la siguiente forma:

 val mapJson= map.asJson
 println(s"Conversión Map[Foo, Int]-> Json =${mapJson}")

La salida por consola es la siguiente:

Conversión Map[Foo, Int]-> Json ={
 "hola" : 123,
 "mundo" : 456
}

El preceso de conversión de una estructura JSON a una estructura Map, se realiza decodificando los valores de la siguiente forma:

 println(s"Conversión Json -> Map[Foo, Int] =${mapJson.as[Map[Foo, Int]]}")

La salida por consola es la siguiente:

Conversión Json -> Map[Foo, Int] =Right(Map(Foo(hola) -> 123, Foo(mundo) -> 456))

En la siguiente entrada, Circe IV: ópticas, realizaré una descripción del uso de ópticas con Circe.

Circe II: manipulación y modificación de JSON

En la entrada anterior, Circe I: introducción y parseadores, realicé una introducción de la librería Circe y, además, describí como se realiza un parseo de una estructura JSON. En la presente entrada, Circe II: manipulación y modificación de JSON, describiré cómo se puede manipular una estructura JSON.

El sketchnote de la presente entrada, queda descrito en la siguiente imagen:

Las operaciones que trataré en la entrada son para la obtención de un campo determinado, o bien, para realizar una modificación de un campo. La importación de los elementos necesarios para manipular JSON son los siguientes:

 import cats.syntax.either._
 import io.circe._
 import io.circe.parser._

La estructura JSON para las pruebas es la siguiente:

val json: String =
 """
 {
 "id": "c730433b-082c-4984-9d66-855c243266f0",
 "name": "Foo",
 "counts": [1, 2, 3],
 "values": {
 "bar": true,
 "baz": 100.001,
 "qux": ["a", "b", "c"]
 }
 }
 """

Para manipular una estructura JSON, es necesario aplicar el parseador para determinar si está bien formado; para ello, realizamos los pasos definidos en la anterior entrada. Para nuestro ejemplo, el parseo se realiza de la siguiente forma:

val doc: Json = parse(json).getOrElse(Json.Null)

Acceso a datos

Para acceder a los datos existentes en un JSON es necesario definir un cursor a partir del objeto obtenido del parseo de la estructura JSON. La definición del cursor en nuestro ejemplo es el siguiente:

val cursor: HCursor = doc.hcursor

Una vez creado el cursor, estamos en disposición para acceder a los datos, existiendo dos formas de acceso, las cuales son las siguientes:

  • La primera forma para acceder a un campo utilizamos la función downField(«NombreCampo»), para obtener del cursor el elemento referenciado para todos los campos; y, una vez posicionados en el campo seleccionado, utilizamos la función as especificando el tipo. Un ejemplo de esta forma de acceso es la siguiente:
 val valueBaz1: Decoder.Result[Double] = cursor.downField("values").downField("baz").as[Double]
 println(s"[*] Valor 'baz' del JSON forma 1=${valueBaz1}")
 println

 La salida por consola es la siguiente:

  [*] Valor 'baz' del JSON forma 1=Right(100.001)
  • La segunda forma para acceder a un campo utilizamos la función downField(«NombreCampo»), para obtener el cursor al elemento referenciado; y, con ésta referencia, utilizamos la función get especificando el tipo y el nombre del campo. Un ejemplo de esta forma de acceso es la siguiente:
 val valueBaz2: Decoder.Result[Double] = cursor.downField("values").get[Double]("baz")
 println(s"[*] Valor 'baz' del JSON forma 2=${valueBaz1}")
 println

La salida por consola es la siguiente:

[*] Valor 'baz' del JSON forma 2=Right(100.001)
  • Para acceder a una lista de elementos utilizamos la función downField(«NombreCampo») y, además, la función downArray junto a la
    función as con el tipo. Un ejemplo de acceso al primer elemento, último elemento y acceso al elemento colocado a la derecha del activo es el siguiente:
 val secondQux1: Decoder.Result[String] = cursor.downField("values").downField("qux").downArray.right.as[String]
 println(s"[*] Valor del array del campo 'qux' del JSON =${secondQux1}")
 println
 val firstQux: Decoder.Result[String] = cursor.downField("values").downField("qux").downArray.first.as[String]
 println(s"[*] Valor del array del campo 'qux' del JSON =${firstQux}")
 println
 val lastQux: Decoder.Result[String] = cursor.downField("values").downField("qux").downArray.last.as[String]
 println(s"[*] Valor del array del campo 'qux' del JSON =${lastQux}")
 println

La salida por consola es la siguiente:

[*] Valor del array del campo 'qux' del JSON =Right(b)
[*] Valor del array del campo 'qux' del JSON =Right(a)
[*] Valor del array del campo 'qux' del JSON =Right(c)

Modificación de datos

Para realizar la modificación de un campo es necesario realizar funciones parecidas al acceso de datos. Es necesario utilizar la función downField y las función withFocus. Así, unos ejemplos de modificación son los siguientes:

  •  Modificación del campo name de la estructura JSON de ejemplo asignando su valor del revés y su posterior visualización, se realiza de la siguiente forma:
 val reversedNameCursor: ACursor = cursor.downField("name").withFocus(_.mapString(_.reverse))
 val reversedName: Option[Json] = reversedNameCursor.top // Retorna todo el JSON.
 println(s"[-] Operación Reverse del JSON =${reversedName}")
 println

La salida por consola es la siguiente:

[-] Operación Reverse del JSON =Some({
 "id" : "c730433b-082c-4984-9d66-855c243266f0",
 "name" : "ooF",
 "counts" : [
 1,
 2,
 3
 ],
 "values" : {
 "bar" : true,
 "baz" : 100.001,
 "qux" : [
 "a",
 "b",
 "c"
 ]
 }
})
  • La asignación del valor «VALOR MODIFICADO» al campo name y su posterior visualización, se realiza de la siguiente forma:
 val modificacion1ameCursor: ACursor = cursor.downField("name").withFocus(_.mapString( elem => "VALOR MODIFICADO"))
 println(s"[-] Modificación del campo name del JSON =${modificacion1ameCursor.top}")
 println

La salida por consola es la siguiente:

[-] Modificación del campo name del JSON =Some({
 "id" : "c730433b-082c-4984-9d66-855c243266f0",
 "name" : "VALOR MODIFICADO",
 "counts" : [
 1,
 2,
 3
 ],
 "values" : {
 "bar" : true,
 "baz" : 100.001,
 "qux" : [
 "a",
 "b",
 "c"
 ]
 }
})

En la siguiente entrada, Circe III: encoding y decoding, realizaré una descripción de cómo realizar operaciones de codificación y decodificación de estructuras JSON con Circe.

Circe I: introducción y parseadores

En todos los sistemas informáticos es necesario el intercambio información mediante unas estructuras de datos. Una de las opciones de intercambio, es la utilización de ficheros; estos ficheros, pueden estár definidos mediante estructuras de datos tipo XML o JSON. En esta entrada, no voy a describir las ventajas de cada uno, ni en qué circunstancias hay que utilizar cada una de ellas; sólamente, me centrará en la librería Circe la cual es una librería de manipulación de estructuras de tipo JSON.

El sketchnote de la presente entrada, queda descrito en la siguiente imagen:

JSON es una acrónimo de JavaScript Object Notation. Es un formato ligero para el intercambio de datos y, debido a su amplio uso, es una alternativa a XML, considerándose como un lenguaje independiente. Un ejemplo de estructura en JSON es el siguiente:

{
 "foo": "bar",
 "baz": 123,
 "list": [ 4, 5, 6 ]
 }

El ejemplo anterior, está compuesto de una estructura formada por tres datos: foo, de tipo String; baz, de tipo entero; y, list, estructura de tipo lista de enteros.

Circe

Para la manipulación de estructuras JSON en lenguaje Scala y en Scala.js, se puede utilizar la librería Circe. La librería Circe es un fork de la librería Argonaut.

La presente entrada, Circe I: introducción y parseadores, es una introducción a la librería y una descripción de cómo se parsea una estructura JSON. En las siguientes entradas, describiré el resto de elementos de Circe.

Actualmente, Circe está en la versión 0.9.1 y tiene dependencia con Scalaz y Cats. La definición de las dependencias en sbt se realiza de la siguiente forma:

 libraryDependencies += "io.circe" %% "circe-core" % "0.9.1",
 libraryDependencies += "io.circe" %% "circe-generic" % "0.9.1",
 libraryDependencies += "io.circe" %% "circe-parser" % "0.9.1",
 libraryDependencies += "io.circe" %% "circe-optics" % "0.9.1"

Si se emplea la versión de Scala 2.10, es necesario la utilización de plugin «Paradise». La definición del plugin se realiza como sigue:

addCompilerPlugin("org.scalamacros" % "paradise" % "2.0.1" cross CrossVersion.full)

Circe no provee ni usa lentes. Si se desea la utilización de lentes es necesario utilizar la librería Monocle. Para el lector interesado en Monocle, puede consultar las entradas que he realizado de la librería cuya primera entrada es Monocle I: introducción y lente Iso

Paseadores de JSON

Los parseadores de JSON son aquellos elementos que determinan si una entrada cumple con las reglas JSON, o bien, mostrar un mensaje informativo.

Las elementos necesarios para operar con los parseadores se encuentran en los siguientes paquetes de Circe:

 import io.circe._
 import io.circe.parser._

El juego de pruebas para los ejemplos son los siguientes:

val rawJson: String =
 """
 {
   "foo": "bar",
   "baz": 123,
   "list of stuff": [ 4, 5, 6 ]
 }
 """
 val badJson: String = "lol"

Para ejecutar los ejemplos, es necesario utilizar la función parse y, unos ejemplos de uso, son los siguientes:

 val parseResult = parse(rawJson)
 println(s"[1] Parser Json puro=${parseResult}")
 println
 // Retorno un Either: Left, con el error.
 val parseBadJson = parse(badJson)
 println(s"[2] Parse Json erróneo=${parseBadJson}")
 println

La salida por consola es la siguiente:

[1] Parser Json puro=Right({
 "foo" : "bar",
 "baz" : 123,
 "list of stuff" : [
    4,
    5,
    6
  ]
 })

[2] Parse Json erróneo=Left(io.circe.ParsingFailure: expected json value got l (line 1, column 1))

Los ejemplos anteriores, se pueden tratar como Pattern Matching de la siguiente forma:

 parse(rawJson) match {
   case Right(json) => println(s"[3] JSON válido: ${json}")
   case Left(failure) => println(s"[3] JSON no válido")
 }
 println
 parse(badJson) match {
   case Right(json) => println(s"[4] JSON válido: ${json}")
   case Left(failure) => println(s"[4] JSON no válido")
 }
 println

La salida por consola es la siguiente:

 [3] JSON válido: {
 "foo" : "bar",
 "baz" : 123,
 "list of stuff" : [
   4,
   5,
   6
 ]
 }
 [4] JSON no válido

Para controlar los valores null, utilizamos la función getOrElse de la librería cats.syntax.either de la sigueinte forma:

 import cats.syntax.either._
 val json: Json = parse(rawJson).getOrElse(Json.Null)
 println(s"[5] Ejemplo getOrElse=${json}")
 println
 val json2: Json = parse(badJson).getOrElse(Json.Null)
 println(s"[6] Ejemplo getOrElse=${json2}")
 println

La salida por consola es la sigueinte:

 [5] Ejemplo getOrElse={
 "foo" : "bar",
 "baz" : 123,
 "list of stuff" : [
   4,
   5,
   6
  ]
 }
 [6] Ejemplo getOrElse=null

En la siguiente entrada, Circe II: manipulación y modificación de JSON, realizaré una descripción de cómo manipular las estructuras JSON con Circe.