AWS Redshift y Luigi, módulo Redshift: operación de consulta

En la presente entrada, AWS Redshift y Luigi, módulo Redshift: operación de consulta , realizaré la descripción de cómo se define una tarea para realizar una consulta o ejecución de una sentencia SQL sobre Amazon Redshift.

Para realizar las operaciones de consulta en Redshift es necesario la implementación de una clase que herede de la clase RedshiftQuery del módulo de Redshift de Luigi. La importación de la clase, se realiza de la siguiente forma:

from luigi.contrib.redshift import RedshiftQuery

La solución que propongo es una jerarquía de clases para poder definir varias consultas. La solución está compuesta por una clase que hereda de RedshiftQuery y la clase que ejecuta una consulta. La clase genérica de consultas con la carga de la configuración es la siguiente:

class AbstractRedshiftQuery(RedshiftQuery):
  """Clase genérica para la realización de consultas en Redshift"""
  config = RedshiftConnection()
  host = config.host
  database = config.database
  user = config.user
  password = config.password
  def output(self):
    return RedshiftTarget(
    host=self.host,
    database=self.database,
    user=self.user,
    password=self.password,
    table=self.table,
    update_id=self.update_id,
    port=5439)

En el snippet anterior, se define la clase AbstractRedshiftQuery que hereda de RedshiftQuery; y, además, se define la clase RedshiftConnection con los datos de configuración de la conexión a Redshift. Los datos de configuración son: host, database, user y password.

Una vez definida la clase AbstractRedshiftQuery, definimos la clase con la consulta. La clase con la sentencia SQL es la siguiente:

class CreateHistorico(AbstractRedshiftQuery):
  """
  La tarea CreateHistorico realiza la creación de una tabla.
  """
  table = "tabla_historia"
  query = """
    CREATE TABLE tabla_historia
    (
     sociedad varchar(60),
     fecha timestamp
    )
  """

En el snippet anterior, se define la clase CreateHistorico que realiza la creación de una tabla en Redshift la cual hereda de la clase AbstractRedshiftQuery. Para realizar la consulta, es necesario definir el atributo query con la sentencia a ejecutar y el atributo table. El resultado es la creación de la tabla tabla_historica en Redshift.

Para su ejecución, se realiza por los procedimientos normales de arranca de Luigi.

En este caso, se ha realizado la creación de una tabla pero, se puede definir cualquier otra sentencia: inserción, borrado,…

Para el lector interesado, las entradas publicadas hasta la fecha son las siguientes:

AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas

En la presente entrada, AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas, realizaré la descripción de la tareas para realizar con Luigi las operaciones de eliminación y copiado de carpetas de un bucket en  Amazon S3 con Luigi. Para realizar las operaciones sobre S3 de Amazon con Luigi, necesitamos instanciar un cliente S3Client dentro de una tarea.

En nuestro caso, definiré una jerarquía de clases en donde la clase padre define la referencia a un cliente S3 de Luigi: de esta forma, las clases hijos simplemente deben de pasar los parámetros con los datos necesarios. La importación del cliente es la siguiente:

from luigi.contrib.s3 import S3Client

La cabecera de la clase con la definición de la referencia al cliente es la siguiente:

class AbstractS3Client(luigi.Task):
  """
  La clase AbstractS3Client realiza la operaciones básicas en S3.
  """
  s3config = S3Configuration()
  s3Client = S3Client(profile_name=s3config.profileName, host=s3config.awsHostS3)
  [...]

Operación de eliminación

La operación de eliminación del contenido de una carpeta de S3 se realiza empleando la función remove del cliente S3Client. Además del path de la carpeta que se quiere eliminar, es necesario el nombre del Bucket de S3.

El snippet con el código con la funcionalidad de la eliminación de la carpeta es la siguiente:

def remove(self, bucketName, path):
  """
  El método removeProcess realiza la eliminación del contenido definido en un path pasado por parámetro de un
  bucket determinado.
  :param bucketName: Nombre del bucket
  :param path: Path
  :return:
  """
  path = "s3://{bucketName}/{path}/".format(bucketName=bucketName, path=path)
  if self.s3Client.exists(path):
    logging.info("\n[AbstractS3Client.removeProcess] Verificado el path={path}".format(path=path))
    resultRemove = self.s3Client.remove(path)
    if resultRemove:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: OK".format(path=path))
    else:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: KO".format(path=path))
  else:
    logging.info("[AbstractS3Client.removeProcess] NO Verificado el path={path}".format(path=path))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta del bucket a eliminar con la variable path; se realiza la comprobación de existencia del bucket que se quiere eliminar con la función exists; si la comprobación anterior es correcta, se realiza la eliminación con la función remove, en otro caso, se muestra un mensaje informativo.

Operación de copia

La operación de copiado del contenido de una carpeta origen a otra carpeta destino, se realiza con la función copy del cliente S3Client. Además, del path de la carpeta origen y destino, es necesario el nombre del bucket de S3.

El snippet con el código con la funcionalidad de la operación de copiado de las carpetas, es el siguiente:

def copy(self, bucketName, sourceFolder, targetFolder):
  """
  El método copyProcess realiza el copiado del contenido de una carpeta origen(sourceFolder) a una carpeta
  destino (targetFolder) de un bucket determinado (bucketName).
  :param bucketName: Nombre del bucket
  :param sourceFolder: Carpeta origen
  :param targetFolder: Carpeta destino.
  :return:
  """
  sourceFile = "s3://{bucketName}/{sourceFolder}/".format(bucketName=bucketName, sourceFolder=sourceFolder)
  if self.s3Client.isdir(sourceFile):
    targetFile = "s3://{bucketName}/{targetFolder}/".format(bucketName=bucketName, targetFolder=targetFolder)
    if self.s3Client.isdir(targetFile):
      resultadoCopiado = self.s3Client.copy(sourceFile, targetFile)
      logging.info("[AbstractS3Client.copy] Copiado contenido DESDE:{sourceFile} A:{targetFile}".format(sourceFile=sourceFile, targetFile=targetFile))
      logging.info("[AbstractS3Client.copy] Numero de ficheros copiados={resultadoCopiado0}".format(resultadoCopiado0=resultadoCopiado[0]))
      logging.info("[AbstractS3Client.copy] Tamaño copiado={resultadoCopiado1}".format(resultadoCopiado1=resultadoCopiado[1]))
    else:
      logging.info("[AbstractS3Client.copy] {targetFile} no es un directorio.".format(targetFile=targetFile))
  else:
    logging.info("[AbstractS3Client.copy] {sourceFile} no es un directorio.".format(sourceFile=sourceFile))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta origen de copiado definido en la variabla sourceFile; se realiza la comprobación de existencia del directorio con la función isDir; si el path del directorio es correcto, se calcula el path de la carpeta destino de copiado definido en la variable targetFile; si el path del directorio destino es correcto empleando la función isDir, se realiza el copiado del contenido de la carpeta origen en la destino; y, de todas las comprobaciones, se muestra el mensaje de error si no se cumple.

Instancia de clases

Para ejecutar la funcionalidad descrita en el apartado anterior, es necesario definir una clase que herede de la clase AbstractS3Client. Al ser esta clase una tarea de Luigi, es necesario la implementación del método run. Si se desea que la tarea esté enlazada con otras tareas, es necesario definir el método output.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar la eliminación de una carpeta es la siguiente:

class DeleteS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.remove(s3Configuration.bucketName, s3Configuration.sourceFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta a eliminar; y, además, define la invocación del método remove para ejecutar la eliminación.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar el copiado del contenido de una carpeta es el siguiente:

class CopyS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.copyProcess(s3Configuration.bucketName, s3Configuration.sourceFolder, s3Configuration.targetFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta origen a copiar; targetFolder, carpeta destino de copiado; y, además, define la invocación del método remove para ejecutar la eliminación.

Las operaciones con Amazon S3 son sencillas de realizar con Luigi. Lo que se tiene que tener claro son todos aquellos datos de configuración para la ejecución de las operaciones.

Para el lector interesado, las entrada publicadas hasta la fecha sobre Luigi son las siguientes:

 

Luigi

En el mundo Big Data y BI, los ETL son unas de las operaciones iniciales para preparar los datos para las tareas de inteligencia; con dichos datos, una vez cargados, se realizan las operaciones para la ayuda en la toma de decisiones. En la presente entrada, «Luigi», defino qué es un ETL y presentar una solución para realizar ETL`s con la solución Luigi.

Definimos un ETL, según la definición en wikipedia, como aquel proceso de extracción, transformación y carga de datos de una fuente de datos a una base da datos, data mart o data warehouse.

La solución tecnológica para la implementación de ETL que describiré en la presente entrada es Luigi  la cual pertenece al contexto del lenguaje Python. Luigi es un paquete de Python el cual permite la construcción de trabajos batch complejos, visualización del estado de procesamiento y encadenamientos de tareas. Luigi es una solución que ha sido adaptada por empresas como Spotify o Red Hat.

El scketchnote de la entrada es el representado en la siguiente imagen:

Instalación

El proceso de instalación de del paquete Luigi es el típico de instalación de un paquete en Python mediante la herramienta pip. Así, el comando de instalación es el siguiente:

pip install luigi

Elementos

Los elementos básicos que intervienen en la definición de un flujo de proceso en Luigi son tres: target, corresponde a una fuente de entrada o salida de datos; un task, tarea de procesamiento; y, parameter, correspondiente a los parámetros de intercambio entre las distintas tareas del flujo de trabajo.

Target

Un Target es aquella clase abstracta que implementa un punto de entrada o salida del flujo; por ejemplo, puede ser: un fichero, una base de datos, un bucket en AWS S3,…

Un Target está compuesta por una jerarquía de clases, como por ejemplo: LocalTarget, S3Target, MySqlTarget, RedshiftTarget; en definitiva, son clases específicas para aquella fuente de datos con las que un flujo trabaja.

Task

Un Task es aquella unidad computacional que realiza una operación con unos datos determinados. Task es una clase abstracta cuyas clases hijas deben de implementar unos métodos básicos: require, para definir las dependencias con otras tareas; output, para definir el resultado del procesamiento; y, por último, run, para definir el procesamiento propiamente dicho.

Parameter

Un Parameter son aquellos elementos de entrada de una tarea. Un Parameter, de la misma forma que los elementos anteriores, es una clase abstracta.

Ejemplos

Operación Suma

La operación de suma consiste en definir una tarea que recibiendo dos parámetros enteros como entrada realice la suma de los mismos. Para realizarlo, definiremos un fichero Python tasksuma.py con el siguiente contenido:

import luigi
class Sumatorio(luigi.Task):
  x = luigi.IntParameter(default=10)
  y = luigi.IntParameter(default=45)

 def run(self):
     print("[***]")
     print("[***] Resultado de la suma:", self.x + self.y)
     print("[***]")

El contenido consiste en la definición de una clase Sumatorio que hereda de la clase Task. La clase Sumatorio recibe dos parámetros enteros; dichos parámetros, tienen unos valores por defecto en el caso de que no existan los valores de entrada. El método run realiza la operación de suma y la escritura por consola.

Para la ejecución de la tarea, en la carpeta raíz del proyecto Python, ejecutamos el siguiente comando:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --x 69 --y 10 --local-scheduler

La salida por consola contiene el resultado de la ejecución de la tarea y las trazas del procesamiento. Así, sin entrar en detalle de todos los mensajes, la salida es la siguiente:

INFO: Informed scheduler that task Sumatorio_69_10_0543bfea4e has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) running Sumatorio(x=69, y=10)
[***]
[***] Resultado de la suma: 79
[***]
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) done Sumatorio(x=69, y=10)
[...]
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
 - 1 Sumatorio(x=69, y=10)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Si no pasamos los valores enteros por parámetro a la tarea, el comando de ejecución sería el siguiente:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --local-scheduler

La salida por consola sería parecida a la anterior, residiendo la diferencia en los valores numéricos de los campos que serían los valores por defecto.

Operaciones anidadas

Todo proceso de transformación requiere la realización de varias actividades y, en muchas ocasiones, esas actividades tienen que ser realizadas por tareas encadenadas. En el siguiente ejemplo, se realizan dos tareas encadenadas definidas en el fichero Python task_anidadas.py

La primera tarea, clase GenerateWords, realiza la creación de un fichero palabras.txt con el contenido de una lista de palabras; y, la segunda tarea, clase CountLetters, realiza la lectura del fichero generado en la tarea anterior, contabiliza la lectura longitud de las palabras y, como resultado, crea un fichero con la palabra y su longitud.

La clase GenerateWords se define de la siguiente forma:

class GenerateWords(luigi.Task):
 def output(self):
    return luigi.LocalTarget('/tmp/LuigiEjem1/palabras.txt')

 def run(self):
    palabras = ['manzana', 'plátano', 'mandarina']
    with self.output().open('w') as f:
      for palabra in palabras:
      f.write('{word}\n'.format(word=palabra))

La clase GenerateWords está compuesta de dos métodos: el método output, en el cual se define el target de salida, es decir, se define el fichero de salida del procesamiento; y, en el método run, se realiza la escritura en el fichero definido en el método output el contenido de la lista palabras.

La clase CountLetters se define de la siguiente forma:

class CountLetters(luigi.Task):
 def requires(self):
   return GenerateWords()

 def output(self):
   return luigi.LocalTarget('/tmp/LuigiEjem1/contador_letras.txt')

 def run(self):
   print("[***]")
   print("[***]self.input().open('r')=", self.input().open('r'))
   print("[***]")
   with self.input().open('r') as ficheroEntrada:
   palabras = ficheroEntrada.read().splitlines()
   with self.output().open('w') as ficheroSalida:
      for palabra in palabras:
      ficheroSalida.write(
       '{word} | {letter_count}\n'.format(
       word=palabra,
       letter_count=len(palabra)
      )
     )

La clase CountLetters está compuesta de tres métodos: requires, método en el que se define la dependencia con la tarea definida en la clase GenerateWords; método output, método en el que se define el target de salida, es decir, el fichero contador_letras; y, finalmente, método run, método que realiza la lectura del fichero generado por la tarea GenerateWord, contabilización de las longitudes de las palabras existentes y escritura en el fichero de salida contador_letras.txt

Para la ejecución del proceso definido por las anteriores tareas, se ejecuta el siguiente comando en la carpeta raíz del proyecto:

PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler

El resultado es la creación en la carpeta ‘/tmp/LuigiEjem1’ de dos ficheros: palabras.txt y contador_letras.txt. El contenido del fichero palabras.txt es el siguiente:

manzana
plátano
mandarina

El contenido del fichero contador_letras.txt es el siguiente:

manzana | 7
plátano | 7
mandarina | 9

4.- Visualización

Para la visualización del estado de ejecución, Luigi posee una herramienta gráfica en la cual se muestra la información del estado de ejecución. Esta herramienta de visualización es un servidor que hay que arrancar; para ello, se emplea el siguiente comando:

luigid --background --port=8082 --logdir=logs

El aspecto del interfaz es el siguiente:

La información visual que muestra en el GUI es relacionada con las tareas que son ejecutas, el estado en el que se encuentra cada una de ellas y las dependencias gráficas entre cada una.

Para que el servidor monitorice el estado de ejecución de cada tarea, es necesario que en el arranque se especifique que el servidor controlará su ejecución. Así, para poderlo realizar y basándonos en los ejemplos anteriores, es necesario eliminar el flag –local-scheduler. Desde un punto de vista práctico, los escenarios son los siguientes:

  • Ejecución en local de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler
  • Ejecución en el servidor Luigi de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters

Conclusiones

Los ETL se basan en procesos de datos simples: lectura de fuente de datos, transformación y escritura en un repositorio de datos destinos. Dichos procesos, en función de la complejidad del negocio, su nivel de dificultad puede variar.

Luigi es una solución del ecosistema de soluciones del lenguaje Python; pero, mientras que aprendía Luigi, me he acortado por el parecido funcionamiento a SpringBatch, solución propuesta por Spring Framework en el ecosistema Java. Para el lector interesado, publiqué hace tiempo unas entradas sobre SpringBatch

Desde mi punto de vista, la dificultad no reside en la solución tecnológica, la dificultad reside en determinar qué datos son los que se tienen que cargar y qué transformaciones se tienen que aplicar a los datos.