Luigi

En el mundo Big Data y BI, los ETL son unas de las operaciones iniciales para preparar los datos para las tareas de inteligencia; con dichos datos, una vez cargados, se realizan las operaciones para la ayuda en la toma de decisiones. En la presente entrada, “Luigi”, defino qué es un ETL y presentar una solución para realizar ETL`s con la solución Luigi.

Definimos un ETL, según la definición en wikipedia, como aquel proceso de extracción, transformación y carga de datos de una fuente de datos a una base da datos, data mart o data warehouse.

La solución tecnológica para la implementación de ETL que describiré en la presente entrada es Luigi  la cual pertenece al contexto del lenguaje Python. Luigi es un paquete de Python el cual permite la construcción de trabajos batch complejos, visualización del estado de procesamiento y encadenamientos de tareas. Luigi es una solución que ha sido adaptada por empresas como Spotify o Red Hat.

El scketchnote de la entrada es el representado en la siguiente imagen:

Instalación

El proceso de instalación de del paquete Luigi es el típico de instalación de un paquete en Python mediante la herramienta pip. Así, el comando de instalación es el siguiente:

pip install luigi

Elementos

Los elementos básicos que intervienen en la definición de un flujo de proceso en Luigi son tres: target, corresponde a una fuente de entrada o salida de datos; un task, tarea de procesamiento; y, parameter, correspondiente a los parámetros de intercambio entre las distintas tareas del flujo de trabajo.

Target

Un Target es aquella clase abstracta que implementa un punto de entrada o salida del flujo; por ejemplo, puede ser: un fichero, una base de datos, un bucket en AWS S3,…

Un Target está compuesta por una jerarquía de clases, como por ejemplo: LocalTarget, S3Target, MySqlTarget, RedshiftTarget; en definitiva, son clases específicas para aquella fuente de datos con las que un flujo trabaja.

Task

Un Task es aquella unidad computacional que realiza una operación con unos datos determinados. Task es una clase abstracta cuyas clases hijas deben de implementar unos métodos básicos: require, para definir las dependencias con otras tareas; output, para definir el resultado del procesamiento; y, por último, run, para definir el procesamiento propiamente dicho.

Parameter

Un Parameter son aquellos elementos de entrada de una tarea. Un Parameter, de la misma forma que los elementos anteriores, es una clase abstracta.

Ejemplos

Operación Suma

La operación de suma consiste en definir una tarea que recibiendo dos parámetros enteros como entrada realice la suma de los mismos. Para realizarlo, definiremos un fichero Python tasksuma.py con el siguiente contenido:

import luigi
class Sumatorio(luigi.Task):
  x = luigi.IntParameter(default=10)
  y = luigi.IntParameter(default=45)

 def run(self):
     print("[***]")
     print("[***] Resultado de la suma:", self.x + self.y)
     print("[***]")

El contenido consiste en la definición de una clase Sumatorio que hereda de la clase Task. La clase Sumatorio recibe dos parámetros enteros; dichos parámetros, tienen unos valores por defecto en el caso de que no existan los valores de entrada. El método run realiza la operación de suma y la escritura por consola.

Para la ejecución de la tarea, en la carpeta raíz del proyecto Python, ejecutamos el siguiente comando:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --x 69 --y 10 --local-scheduler

La salida por consola contiene el resultado de la ejecución de la tarea y las trazas del procesamiento. Así, sin entrar en detalle de todos los mensajes, la salida es la siguiente:

INFO: Informed scheduler that task Sumatorio_69_10_0543bfea4e has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) running Sumatorio(x=69, y=10)
[***]
[***] Resultado de la suma: 79
[***]
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) done Sumatorio(x=69, y=10)
[...]
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
 - 1 Sumatorio(x=69, y=10)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Si no pasamos los valores enteros por parámetro a la tarea, el comando de ejecución sería el siguiente:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --local-scheduler

La salida por consola sería parecida a la anterior, residiendo la diferencia en los valores numéricos de los campos que serían los valores por defecto.

Operaciones anidadas

Todo proceso de transformación requiere la realización de varias actividades y, en muchas ocasiones, esas actividades tienen que ser realizadas por tareas encadenadas. En el siguiente ejemplo, se realizan dos tareas encadenadas definidas en el fichero Python task_anidadas.py

La primera tarea, clase GenerateWords, realiza la creación de un fichero palabras.txt con el contenido de una lista de palabras; y, la segunda tarea, clase CountLetters, realiza la lectura del fichero generado en la tarea anterior, contabiliza la lectura longitud de las palabras y, como resultado, crea un fichero con la palabra y su longitud.

La clase GenerateWords se define de la siguiente forma:

class GenerateWords(luigi.Task):
 def output(self):
    return luigi.LocalTarget('/tmp/LuigiEjem1/palabras.txt')

 def run(self):
    palabras = ['manzana', 'plátano', 'mandarina']
    with self.output().open('w') as f:
      for palabra in palabras:
      f.write('{word}\n'.format(word=palabra))

La clase GenerateWords está compuesta de dos métodos: el método output, en el cual se define el target de salida, es decir, se define el fichero de salida del procesamiento; y, en el método run, se realiza la escritura en el fichero definido en el método output el contenido de la lista palabras.

La clase CountLetters se define de la siguiente forma:

class CountLetters(luigi.Task):
 def requires(self):
   return GenerateWords()

 def output(self):
   return luigi.LocalTarget('/tmp/LuigiEjem1/contador_letras.txt')

 def run(self):
   print("[***]")
   print("[***]self.input().open('r')=", self.input().open('r'))
   print("[***]")
   with self.input().open('r') as ficheroEntrada:
   palabras = ficheroEntrada.read().splitlines()
   with self.output().open('w') as ficheroSalida:
      for palabra in palabras:
      ficheroSalida.write(
       '{word} | {letter_count}\n'.format(
       word=palabra,
       letter_count=len(palabra)
      )
     )

La clase CountLetters está compuesta de tres métodos: requires, método en el que se define la dependencia con la tarea definida en la clase GenerateWords; método output, método en el que se define el target de salida, es decir, el fichero contador_letras; y, finalmente, método run, método que realiza la lectura del fichero generado por la tarea GenerateWord, contabilización de las longitudes de las palabras existentes y escritura en el fichero de salida contador_letras.txt

Para la ejecución del proceso definido por las anteriores tareas, se ejecuta el siguiente comando en la carpeta raíz del proyecto:

PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler

El resultado es la creación en la carpeta ‘/tmp/LuigiEjem1’ de dos ficheros: palabras.txt y contador_letras.txt. El contenido del fichero palabras.txt es el siguiente:

manzana
plátano
mandarina

El contenido del fichero contador_letras.txt es el siguiente:

manzana | 7
plátano | 7
mandarina | 9

4.- Visualización

Para la visualización del estado de ejecución, Luigi posee una herramienta gráfica en la cual se muestra la información del estado de ejecución. Esta herramienta de visualización es un servidor que hay que arrancar; para ello, se emplea el siguiente comando:

luigid --background --port=8082 --logdir=logs

El aspecto del interfaz es el siguiente:

La información visual que muestra en el GUI es relacionada con las tareas que son ejecutas, el estado en el que se encuentra cada una de ellas y las dependencias gráficas entre cada una.

Para que el servidor monitorice el estado de ejecución de cada tarea, es necesario que en el arranque se especifique que el servidor controlará su ejecución. Así, para poderlo realizar y basándonos en los ejemplos anteriores, es necesario eliminar el flag –local-scheduler. Desde un punto de vista práctico, los escenarios son los siguientes:

  • Ejecución en local de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler
  • Ejecución en el servidor Luigi de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters

Conclusiones

Los ETL se basan en procesos de datos simples: lectura de fuente de datos, transformación y escritura en un repositorio de datos destinos. Dichos procesos, en función de la complejidad del negocio, su nivel de dificultad puede variar.

Luigi es una solución del ecosistema de soluciones del lenguaje Python; pero, mientras que aprendía Luigi, me he acortado por el parecido funcionamiento a SpringBatch, solución propuesta por Spring Framework en el ecosistema Java. Para el lector interesado, publiqué hace tiempo unas entradas sobre SpringBatch

Desde mi punto de vista, la dificultad no reside en la solución tecnológica, la dificultad reside en determinar qué datos son los que se tienen que cargar y qué transformaciones se tienen que aplicar a los datos.