Microservicios en Python: plantilla básica.

En la presente entrada, Microservicios en Python: plantilla básica, realizaré una descripción de una plantilla base de un microservicio en Python utilizando la librería Flask.

La arquitectura de microservicios es aquel enfoque que permite definir aplicaciones software mediante un conjunto de servicios desplegables de forma independiente, es decir, una aplicación es un conjunto de pequeñas aplicaciones poco acopladas. La definición de microservicio que realiza Martin Fowler es la siguiente:

«El término ‘Arquitectura de microservicio’ ha surgido en los últimos años para describir una forma particular de diseñar aplicaciones de software como conjuntos de servicios desplegables de forma independiente. Si bien no existe una definición precisa de este estilo arquitectónico, existen ciertas características comunes en torno a la organización en torno a la capacidad empresarial, la implementación automatizada, la inteligencia en los puntos finales y el control descentralizado de idiomas y datos.»

El acoplamiento entre los microservicios, se puede realizar utilizando colas, brokers de mensajería o mediante peticiones HTTP. Un ejemplo de un productor y consumidor de mensajes para el broker de mensjaes que contiene Redis pueden ser los que describo en los siguientes enlaces:

La funcionalidad de la plantilla del microservicio es muy simple, se definirá un punto de entrada de tipo POST al cual se le pasarán los campos nombre, operación y operador y, como resultado, retornará un JSON con el resultado. Se empleará la técnica DDD Domain Driven Design para definir una entidad de dominio la cual será almacenda en un supuesto contenedor de datos, en nuestro caso, en memoria.

Las dependencias de las librerías del proyecto se definen en el fichero requirements.txt y contiene las siguientes referencias: flask, dataclasses y pytest.

La arquitectura está compuesta por tres capas horizontales: capa de presentación, representada por el paquete entrypoints; cada de servicios, representada por el paquete services; y, capa de datos, representada por el paquete repository. Desde un punto de vista vertical, tenemos las siguientes capas: capa de dominio, representada por el paquete domain en donde se define las entidades de dominio y DTO; y, por último,capa de excepciones, representado con el paquete exception en cual contiene las excepciones del
aplicativo.

Descripción arquitectónica por capas

Capa de dominio

La capa de dominio está compuesto por el módulo entity_model.py el cual contiene la entidad de dominio UseCaseEntity y los DTO UseCaseRequest y UseCaseResponse. El snippet de la entidad de dominio es el siguiente:

class UseCaseEntity:
    def __init__(self,
                 uuid: str,
                 name: str,
                 operation: str,
                 operator: int,
                 date_data: Optional[date] = None):
        self.uuid = uuid
        self.name = name
        self.operation = operation
        self.operator = operator
        self.date = date_data
    @property
    def calculate(self) -> int:
        result = 0
        if self.operation == "+":
             result = self.operator + self.operator
        elif self.operation == "*":
             result = self.operator * self.operator
        else:
             result = -1
        return result

Capa de presentación

La capa de presentación está compuesta por el módulo app.py. Los puntos de entrada son: métodos liveness y rediness para conocer el estado del microservicio (en el ejemplo no realizan ninguna operación) y el método para la operación de negocio use_case_example; este método, realiza la obtención de los parámetros de la petición HTTP, creación del DTO de la petición e invocación al método de servicio; para finalizar, retorna el resultado. El snippet de la función es la siguiente:

@app.route("/use_case_example", methods=['POST'])
def do_use_case_example():
    """
    use case example
    curl --header "Content-Type: application/json" --request POST \
         --data '{"name":"xyz1", "operation":"+", "operator":"20"}' \
         http://localhost:5000/use_case_example
    :return: str
    """
    p_name = request.json['name']
    p_operation = request.json['operation']
    p_operator = int(request.json['operator'])
    current_app.logger.info(f"[*] /use_case_example")
    current_app.logger.info(f"[*] Request: Name={p_name} operation={p_operation} operator={p_operator}")
    current_app.logger.info(f"Name={p_name} operation={p_operation} operator={p_operator}")
    data_request = entity_model.UseCaseRequest(uuid=uuid.UUID,
                                               name=p_name,
                                               operation=p_operation,
                                               operator=p_operator)
    repository = use_case_repository.UseCaseRepository()
    response_use_case = use_case_service.do_something(data_request, repository)
    data = jsonify({'result': response_use_case.resul})
    return data, 200

Capa de servicio

La capa de servicio está compuesta por el módulo use_case_service.py el cual contiene la función que realiza la operación de negocio: creación de la entidad de dominio, inserción en el repositorio de datos y retorno del resultado. El snippet de la función es el siguiente:

def do_something(request: entity_model.UseCaseRequest,
                 repository: use_case_repository.AbstractUseCaseRepository) -> entity_model.UseCaseResponse:
    """
    Business operation.
    :param request: entity_model.UseCaseRequest
    :param repository: use_case_repository.AbstractUseCaseRepository

    :return: entity_model.UseCaseResponse
    """
    if request is None:
        raise use_case_exception.UseCaseRequestException()
    logging.info(f"[**] /use_case_service.do_something")
    entity = entity_model.UseCaseEntity(uuid=request.uuid,
                                        name=request.name,
                                        operation=request.operation,
                                        operator=request.operator,
                                        date_data=date.today())
    repository.add(entity)
    return entity_model.UseCaseResponse(str(entity.calculate))

Capa de repositorios

La capa de repositorio define el respositorio en donde se almacenan los datos la cual está compuesta por el módulo use_case_repository.py. El módulo contiene la definición del repositorio UseCaseRepository para la entidad UseCaseEntity y la clase de abstracta con las operaciones de los repositorios. El snippet del repositorio es el siguiente:

class UseCaseRepository(AbstractUseCaseRepository):
    """
    Definition of the operations that connect to database.
    """
    def __init__(self) -> None:
        self.database: [entity_model.UseCaseEntity] = []
    def add(self, entity: entity_model.UseCaseEntity) -> bool:
        result: bool = False
        logging.info(f"[***] /use_case_repository.add")
        if entity is not None:
            result = True
            self.database.append(entity)
        return result
    def get(self, p_uuid: str) -> entity_model.UseCaseEntity:
        index = 0
        enc = False
        result: entity_model.UseCaseEntity = None
        logging.info(f"[***] /use_case_repository.get")
        while (index < len(self.database)) and not enc:
            aux: entity_model.UseCaseEntity = self.database[index]
            if aux.uuid == uuid.UUID(p_uuid):
                result = aux
                enc = True
            index += 1
       return result

Pruebas

La plantilla contiene el directorio tests el cual contiene los test de la plantilla del microservicio. Para ejecutar los test se ejecuta el siguiente comando desde la carpeta raíz del proyecto:

>pytest --setup-show

Docker

Todo microservicio debe de tener la definición de la imagen para que sea ejecutado en un contenedor. Así, existe el fichero Dockerfile para definir dicha imagen. El contenido de la imagen contiene las operaciones de instalación de las herramientas para Python, instalación de las librerías, copiado de código fuente y variables de entorno y ejecutación. El snippet con el contenido del DOckerfile es el siguiente:

FROM python:3.8-alpine
RUN apk add --no-cache --virtual .build-deps gcc musl-dev python3-dev
RUN apk add libpq
COPY requirements.txt /tmp
RUN pip install -r /tmp/requirements.txt
RUN apk del --no-cache .build-deps
RUN mkdir -p /app
COPY . /app/
WORKDIR /app
ENV FLASK_APP=entrypoints/app.py FLASK_DEBUG=1 PYTHONUNBUFFERED=1
CMD flask run --host=0.0.0.0 --port=80

Makefile

Para facilitar las operaciones de Docker, se ha definido un fichero de tipo Makefile en el cual se definen las operaciones necesarias para operar con Docker. Las operaciones son las siguientes:

  • Operación build.- Creación de la imagen. Para ejecutar la operación se ejecuta el comando make build desde la raíz del proyecto. El snippet de la definición de la operación build es la siguiente:
build:
     docker image build -t alvaroms/template-microservice:v1.0 .
  • Operación run.- Arranque de un contenedor con la imagen del proyecto. Para ejecutar la operación se ejecuta el comando make run desde la raíz del proyecto. El snippet de la definición de la operación run es la siguiente:
run:
   docker container run -d --name template-microservice -p 6060:80 alvaroms/template-microservice:v1.0
  • Operación exec.- Acceso a la consola del contenedor. Para ejecutar la operación se ejecuta el comando make exec desde la raíz del proyecto. El snippet de la definición de la operación exec es la siguiente:
exec:
    docker container exec -it template-microservice /bin/sh
  • Operación logs.- Visualización de los logs del contenedor. Para ejecutar la operación se ejecuta el comando make logs desde la raíz del proyecto. El snippet de la definición de la operación logs es la siguiente:
logs:
    docker container logs template-microservice
  • Operación test.- Ejecución de los test. Para ejecutar la operación se ejecuta el comando make test desde la raíz del proyecto. El snippet de la definición de la operación test es la siguiente:
test:
    pytest --setup-show
  • Operación all.- Ejecución de los test, construcción de la imagen y arranque del contenedor. Para ejecutar la operación se ejecuta el comando make all desde la raíz del proyecto. El snippet de la definición de la operación test es la siguiente:
all: test build run

Pruebas del API

Los comando curl para realizar las pruebas sobre el microservicio desplegado en el contenedor son los siguientes:

  • Root del microsercicio.
    curl http://localhost:6060/
  • Función rediness
    curl http://localhost:6060/readiness
  • Función liveness
    curl http://localhost:6060/liveness
  • Función de negocio.
    curl --header "Content-Type: application/json" --request POST \
    
    --data '{"name": "xyz1", "operation": "+", "operator": "20"}' \
    
    http://localhost:6060/use_case_example

Integración Contínua

Para finalizar, se define un pipeline de integración contínua definida en el fichero .travis.yml. El snippet con el contenido es el siguiente:

dist: xenial
language: python
python: 3.6
install:
- pip3 install -r requirements.txt

script:
- make test

branches:

 

Para el lector interesado puede acceder al código a través del siguiente enlace.

Redis: consumidor de mensajes

En la entrada anterior, Redis: productor de mensajes, describo cómo definir un productor para la publicación de mensajes en el broker Redis; en la presente entrada, Redis: consumidor de mensajes, describiré cómo consumir mensajes del broker.

El primer paso es crear el broker al cual publicar mensajes; para ello, trabajaré con una imagen Docker con Redis. Para descarga la imagen y arrancar el contenedor es necesario ejecutar los siguientes comandos:

docker pull redis
docker run --name some-redis -d redis

Tras su ejecución, tendremos Redis en una contenedor cuyo puerto de acceso es el 6379.

El segundo paso, es escribir el código del consumidor. Seguiremos los mismos criterios que en la entrada Redis: productor de mensajes.

Para crear la conexión con Redis, creamos un objeto de tipo Redis con los datos de la conexión a Redis. El snippet del código es el siguiente:

import redis
publish_redis = redis.Redis(host=config.HOST_REDIS, port=config.PORT_REDIS, db=0)

Una vez que tenemos la referencia a Redis, necesitamos suscribirnos al topic donde leer los mensajes; una vez suscritos, nos mantenemos a la espera de la recepción del mesaje; al recepcionar el mensaje, obtenemos un mensaje con una estructura de diccionario del cual deberemos de obtener el campo data. El snippet con el código es el siguiente:

consume_client_topic = publish_redis.pubsub()
consume_client_topic.subscribe(config.TOPIC_REDIS)
for message in consume_client_topic.listen():
  if message['data'] != 1:
    data = json.loads(message['data'].decode())
    logging.info(f"mesagge={data['message']} result={data['result']}")

Para el lector interesado, el código del enlace está en el siguiente enlace.

Redis: productor de mensajes

Redis es una herramienta Open source la cual puede ser utilizada como un almacén de estructura de datos en memoria, como una cache, como base de datos y como un broker
de mensajes. En la presente entrada, Redis: productor de mensajes en el broker, me centraré en describir cómo crear un productor de mensajes en el broker de mensajes de Redis. El ejemplo estará definido en lenguaje Python.

El primer paso es crear el broker al cual publicar mensajes; para ello, trabajaré con una imagen Docker con Redis. Para descargar la imagen y arrancar el contenedor es necesario ejecutar los siguientes comandos:

docker pull redis
docker run --name some-redis -d redis

Tras su ejecución, tendremos Redis en una contenedor cuyo puerto de acceso es el 6379.

El segundo paso es crear un proyecto Python en donde definiremos la dependencia del paquete redis y un fichero de tipo Python con el código del productor.

Para crear la conexión con Redis, creamos un objeto de tipo Redis con los datos de la conexión a Redis. El snippet del código es el siguiente:

import redis
publish_redis = redis.Redis(host=config.HOST_REDIS, port=config.PORT_REDIS, db=0)

Una vez creado la referecia a Redis, utilizaremos la función publish para publicar un mensaje en un topic de Redis. El snippet ejemplo es el siguiente:

msg = '{"message": "Test message-%d", "result": "OK"}' % index
publish_redis.publish(config.TOPIC_REDIS, msg)

El valor config.TOPIC_REDIS corresponde con un valor alfanumérico.

Para el lector interesado, el código del enlace está en el siguiente enlace.

En la siguiente entrada, Redis: consumidor de mensajes, realizaré la descripción de un consumidor de mensajes.

Azure Function en Python

La plataforma cloud de Azure ofrece un servicio para definir funciones, el servicio se llama «Function App». Las funciones permiten definir funciones para la construcción de arquitecturas Serverless. Las funciones pueden ser definidas en distintos lenguajes, como pueden ser: Java, Python, F#, C#,… En la presente entrada, Azure Function en Python, me centraré en cómo crear funciones Python y su despliegue en Azure.

La estructura de la entrada es la siguiente:

  1. Instalación de Azure Functions Tools
    • Creación del proyecto
    • Creación de una función
    • Ficheros de configuración
  2. Función Functionbase
  3. Función Functionstore
  4. Ejecución de las funciones en local
  5. Publicación en Azure

Azure no proporciona un intarfaz en donde se puede desarrollar una función, es necesario  crear un entorno con unas herramientas determinadas, desarrollar la función y, una vez creada, se realiza el despliegue a la plataforma.

Para definir funciones en Azure es necesario realizar las siguientes tareas en un entorno local:

  1. Instalar el paquete Azure Functions Tools.
  2. Desarrollar en local la función.
  3. Desplegar el código en Azure.

1.- Instalación de Azure Functions Tools.

Azure Functions Tools es un conjunto de herramientas para el desarrollo de funciones en Python. En mi caso, al ser un entorno Linux, necesito ejecutar la siguiente secuencia de comandos:

curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg
sudo mv microsoft.gpg /etc/apt/trusted.gpg.d/microsoft.gpg
sudo sh -c 'echo "deb [arch=amd64] https://packages.microsoft.com/repos/microsoft-ubuntu-$(lsb_release -cs)-prod $(lsb_release -cs) main" > /etc/apt/sources.list.d/dotnetdev.list'
sudo apt-get update
sudo apt-get install azure-functions-core-tools
sudo apt-get install python3-venv

La versión de Python es la versión 3.6 y, para asegurar la creación de entornos virtuales, he ejecutado el comando de instalación del paquete python3-venv.

Para asegurar que el paquete se ha instalado correctamente, podemos ejecutar en la línea de comandos el comando func -h; el resultado, es la visualización de la ayuda del comando.

Creación del proyecto

La creación de un proyecto consiste en la creación de una estructura de directorios con los elementos necesarios para poder trabajar. El comando de creación del proyecto es el
siguiente:

func init MyFunctionProj

Una vez ejecutado, se tendrá que seleccionar el lenguaje del proyecto, en nuestro caso, opción 3 Python y se creará un directorio con nombre MyFunctionProj con los elementos necesarios para poder trabajar. El contenido de la carpeta será el siguiente:

  • host.json.- Fichero JSON de configuración de extensiones y confuguración.
  • local.settings.json.- Fichero con la configuración de acceso a los servicios Azure.
  • requirements.txt.- Fichero con las dependencias de Python.

Creación de una función

Para realizar la creación de una función, desde la línea de comandos, nos ubicaremos en la carpeta raíz del proyecto; y, una vez allí, ejecutamos el siguiente comando:

func new

El comando nos obliga a seleccionar una plantilla de función entre las nueve existentes las cuáles son:

Select a template:
1. Azure Blob Storage trigger
2. Azure Cosmos DB trigger
3. Azure Event Grid trigger
4. Azure Event Hub trigger
5. HTTP trigger
6. Azure Queue Storage trigger
7. Azure Service Bus Queue trigger
8. Azure Service Bus Topic trigger
9. Timer trigger
Choose option:

Llegado a este punto es necesario realizar una parada para definir el significado del concepto de trigger. Un trigger es aquel elemento que permite desencadenar el funcionamiento de una función; un ejemplo de trigger, es una petición HTTP, es decir, cuando se realiza una petición HTTP se inicia la ejecución de una función; otro tipo de trigger puede ser Blob Storage Trigger, es aquel desencadenador que ejecuta una función por la existencia de un fichero en un blob determinado. De los posibles desencadenadores tenemos los siguiente tipos: HTTP, Cosmos DB, Blob Storage, un Timer, Event Grid, Event Hub, Service Bus Queue, Service Bus topic y Queue Storage. Una vez seleccionado el tipo, se deberá de informar del nombre de la función.

Los ejemplos que describiré son dos funciones:

  1. FunctionBase.- Función sencilla con desencadenador HTTP
  2. FunctionStore.- Función con desencadenador HTTP, un Blob Storage de entrada y otro de salida para realizar lecturas y escrituras en un Store de Azure.

La creación de cada función realiza la creación de un directorio con dos ficheros los cuáles son los siguientes:

  • __init__.py.- Fichero con el código Python de la función.
  • function.json.- Fichero con la configuración de la función: definición del desencadenador binding, route, nombre de la función a ejecutar en el fichero __init__.py

La estructura del proyecto con las dos funciones y los ficheros de configuración tiene un aspecto como el siguiente:

__app__
| - FunctionBase
| | - __init__.py
| | - function.json
| - FunctionStore
| | - __init__.py
| | - function.json
| - host.json
| - local.settings.json
| - requirements.txt
tests

Ficheros de configuración

  • host.json.- Fichero con la configuración de las funciones. El contenido es el siguiente:
{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  }
}
  • local.settings.json.- Fichero con las conexiones a los servicios de Azure.
{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsStorage": "<CONNECTION_STRING>",
    "FUNCTIONS_EXTENSION_VERSION": "~2",
    "APPINSIGHTS_INSTRUMENTATIONKEY": "a66ab777-1fa1-222c-3333-4e44d4c4444f"
  },
  "ConnectionStrings": {}
}
  • requirements.txt.- Definición de las dependencias de las librerías Python.
azure-functions
azure-functions-worker
azure-storage-blob
azure-cosmos
azure-storage
azure-storage-blob
pillow>=6.2.0

Para ejecutar el entorno virtual local desde la carpeta del proyecto en una consola, ejecutamos la siguiente secuencia de comandos:

virtualenv .venv -p python3
source .venv/bin/activate
pip install -r requirements.txt

2.- Función Functionbase

La función FunctionBase es una función sencilla que realiza la definición de un desencadenador de tipo HTTP y, su salida, es una respuesta HTTP. Realiza la lectura de petición HTTP y responde en función de los parámetros de entrada.

El contenido del fichero function.json es el siguiente:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Del snippet anterior hay que destacar lo siguiente:

  • El elemento «scriptFile» define el fichero con el código de la función.
  • El elemento «type» define el tipo de elemento, en nuestro caso, trigger de tipo HTTP o HTTP.
  • El elemento «direction» determina si es de entrada o de salida.
  • El elemento «name» identifica la referencia en el código.
  • El elemento «methods» identifica los métodos HTTP que define la función, en nuestro caso, GET o POST.

El contenido del fichero __init__.py es el siguiente:

import logging
import azure.functions as func
from datetime import datetime
def main(req: func.HttpRequest) -> func.HttpResponse:
  """
  + curl -v -w '\n' -d '{"name":"pp"}' -H 'DateData: 2019/11/26' -H 'Content-Type: application/json' -X POST http://localhost:7071/api/FunctionBase
  + curl -v -w '\n' -d '{"name":"pp"}' -H 'DateData: 2019/11/26' -H 'Content-Type: application/json' -X POST https://<URL-AZURE>.net/api/FunctionBase?code=<FUNCTION_KEY>
  """
  logging.info('Python HTTP trigger function processed a request.')
  datetime_object = datetime.strptime(req.headers.get('DateData'), '%Y/%m/%d')
  name = req.params.get('name')
  if not name:
    try:
      req_body = req.get_json()
    except ValueError:
      pass
  else:
    name = req_body.get('name')
  if name:
    return func.HttpResponse('{ "test": "' + name + '", "date": "' + str(datetime_object) + '"}')
  else:
    return func.HttpResponse(
      "Please pass a name on the query string or in the request body", status_code=400)

El código define una función main que recibe la referencia de la petición HTTP, realiza un procesamiento conforme a los parámetros de entrada y, por último, crea la respuesta.

3.- Función  Functionstore

La función FunctioStore realiza el copiado de un fichero en un Store de Azure cuyo nombre es pasado en una petición HTTP.

El contenido del fichero function.json es el siguiente:

{
  "disabled": false,
  "scriptFile": "__init__.py",
  "bindings": [
  {
    "authLevel": "function",
    "type": "httpTrigger",
    "direction": "in",
    "name": "request",
    "methods": [
      "get",
      "post"
    ],
    "route": "FunctionStore/{name:alpha?}"
  },
  {
    "name": "inputblob",
    "type": "blob",
    "path": "staging/{name}.png",
    "connection": "AzureWebJobsStorage",
    "direction": "in"
  },
  {
    "name": "blobout",
    "type": "blob",
    "direction": "out",
    "path": "staging/{name}-copy.txt",
    "connection": "AzureWebJobsStorage"
  },
  {
    "type": "http",
    "direction": "out",
    "name": "$return"
  }
 ]
}

En el código anterior y partiendo de la primera función, se añaden los dos blob de lectura y escritura, respectivamente, inputblob y outputblob. El elemento «path», corresponde con el path de la ubicación de los ficheros; el elemento «type», corresponde con el tipo blob; el elemento «connection», corresponde con la referencia a la conexión del blob; y, un detalle importante es el elemento «route», define el nombre del fichero pasado en la URL de la petición y dicho parámetro es usado en los blob de entrada y salida para referenciar el nombre del fichero de lectura y  de escritura.

El contenido del fichero con el código Python es el siguiente:

import logging
import azure.functions as func
from datetime import datetime
def main(request: func.HttpRequest, inputblob: func.InputStream, blobout: func.Out[func.InputStream], context: func.Context) -> func.HttpResponse:
  """
  curl -v -w '\n' -H 'DateData: 2019/11/26' -d '{ "param1": "value1", "param2": "value2" }' -X GET http://localhost:7071/api/FunctionStore/watermark?name=11
  """
  logging.info('Python HTTP trigger function processed a request.')
  logging.info(f'context.function_directory={context.function_directory}.')
  logging.info(f'context.function_name={context.function_name}.')
  logging.info(f'context.invocation_id={context.invocation_id}.')
  logging.info(f"Params: {request.params}")
  logging.info(f"Route Params: {request.route_params}")
  logging.info(f"Body: {request.get_body()}")
  logging.info(f"Headers: {request.headers}")
  logging.info(f"Headers: {request.headers.get('DateData')}")
  datetime_object = datetime.strptime(request.headers.get('DateData'), '%Y/%m/%d')
  logging.info(f"Fecha: {datetime_object}")
  name = datetime_object
  if not name:
    try:
      req_body = request.get_json()
    except ValueError:
      pass
  else:
    name = req_body.get('name')
  blobout.set(inputblob)
  if name:
    return func.HttpResponse(f"Hello {name}!")
  else:
    return func.HttpResponse(
      "Please pass a name on the query string or in the request body",status_code=400)

Del código anterior, destaca la siguiente línea: blobout.set(inputblob) ; ésta línea, realiza la lectura del fichero definido en inputblob y su copiado en blobout. El resto del código es la visualización de los parámetros de la petición HTTP y su tratamiento.

4.- Ejecución de las funciones en local

Para ejecutar las funciones en un entorno local es necesario ejecutar desde la carpeta del proyecto el siguiente comando:

func host start

Para ejecutar las funciones ejecutamos el comando curl desde la línea de comandos. Respectivamente, para la función base y Store, ejecutamos los siguientes comandos:

curl -v -w '\n' -d '{"name":"pp"}' -H 'DateData: 2019/11/26' -H 'Content-Type: application/json' -X POST http://localhost:7071/api/FunctionBase
curl -v -w '\n' -H 'DateData: 2019/11/26' -d '{ "param1": "value1", "param2": "value2" }' -X GET http://localhost:7071/api/FunctionStore/watermark?name=11

5.- Publicación en Azure

Para realizar el despliegue, es necesario crear una función en Azure. El código Python será el código del proyecto. El proceso de creación es sencillo desde el portal de Azure,  simplemente, hay que seleccionar el grupo de recurso, lenguaje, nombre,…

Una vez que la tengamos creada, desde la línea de comando realizaremos la operación de login con el comando «az login» y, una vez logueado, ejecutamos el comando de despliegue el cuál es el siguiente:func azure functionapp publish <APP_NAME>. Para nuestro caso, si la Function App tiene nombre MyFunctionProj el comando es el siguiente:

func azure functionapp publish MyFunctionProj

Para realizar las pruebas, se puede utilizar el comando curl, Postman o el propio interfaz de pruebas que proporciona Azure.

En la siguiente imagen, realizo un resumen de la entrada de los apartados anteriores representados como un sketchnoting.

 

Si el lector está interesado en el código, lo puede encontrar en el siguiente enlace.

Test unitarios y cobertura de código en Python

En la presente entrada, Test unitarios y cobertura de código en Python, realizaré la descripción de cómo se realizan test unitarios en Python con unittest y, además, cómo se realizan el análisis del código para generar el índice de cobertura de código con la herramienta coverage.

Los ejemplos estarán realizados con la versión 3.6 de Python.

logo-python

Test unitarios

Los test unitarios los definimos utilizando el framework unittest el cual está incorporado en la distribución de la versión del lenguaje.

Para realizar un test de un código, iniciaremos la definición de un código al cual se definirán el conjunto de test a definir. Este proceso inicial lo realizaré para comprender el proceso.

Definiré una clase de utilidad Util con un método statusToCode cuya funcionalidad consistirá en parsear un parámetro alfanumérico de entrada y, como salida, retornará un valor entero.

El snippet de la clase es la siguiente:

class Util:
  """Class utils."""
  @staticmethod
  def statusToCode(code="") -> int:
    """Return exit code"""
    assert len(code) > 0, "Argument not valid"
    result = {
      'UP': 0,
      'WARNING': 1,
      'CRITICAL': 2,
      'UNKOWN': 3,
    }.get(code, 3)
    return result

La clase Util está definida en el módulo util.py dentro de la carpeta lib. La función statusToCode tiene un decorador definido con nombre @staticmethod el cual permite definir el método en la clase con referencia estática para poderlo utilizar sin la necesidad de instanciar la clase.

Los test los definiremos en la carpeta lib_test la cual está definida al mismo nivel que la clase lib. Definiré las clases de test conforme a los módulos definidos. Así, tendremos la clase UtilTest en el módulo test_utils.py de la carpeta lib_test.

La clase UtilTest deberá de importar el módulo unittest y definir la clase heredando de la clase unittest.TestCase para poder realizar los test. Además, deberá de importar la clase con el código que se desea probar. Así, la clase queda definida de la siguiente forma:

import unittest
from lib.utils import Util

class UtilTests(unittest.TestCase):
  def setUp(self):
    pass

  def test_statusToCode_EMPTY(self):
    try:
      print(sys.executable)
      Util.statusToCode("")
    except AssertionError as exception:
      self.assertTrue(exception != None)

  def test_statusToCode_UP(self):
    self.assertEqual(Util.statusToCode("UP"), 0)

  def test_statusToCode_WARNING(self):
    self.assertEqual(Util.statusToCode("WARNING"), 1)

  def test_statusToCode_CRITICAL(self):
    self.assertEqual(Util.statusToCode("CRITICAL"), 2)

  def test_statusToCode_UNKNOWN(self):
    self.assertEqual(Util.statusToCode("UNKNOWN"), 3)

La clases UtilTests presenta seis métodos: el método setUp, el cual realiza la definición de las operaciones previas a la ejecución de los test, en nuestro caso no es necesario realizar ninguna; y, el resto de métodos, que definen los test al tener como prefijo la cadena «test_».

La verificación de los resultados se realiza empleando la referencia self la cual define las funciones de comprobación.

Cobertura

La cobertura de código la realizaremos con la herramienta coverage cuya referencia la pondremos en el fichero requirements.txt para que sea cargado en el entorno virtual del proyecto.

La herramienta coverage tiene la capacidad de realizar la generación de los informes por línea de comando, o bien, la generación de los informes en formato html; dichos informes, se generarán en la carpeta htmlcov del propio proyecto.

Los comandos que ejecutaremos son los siguientes:

  • coverage erase.- Eliminación de los datos previos de cobertura. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage erase
  • coverage run.- Arranque del programa Python que recolecta los datos. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage run –omit=’.tox/*,.venv/*’ -m unittest
  • coverage report.- Generación resultados. Un ejemplo de ejecución en la línea de comandos es el siguiente:  coverage report –omit=’.tox/*,venv/*’ -m
  • coverage html.- Generación de los informer en formato HTML. Un ejemplo de ejecución en la línea de comandos es el siguiente: coverage html –omit=’.tox/*,venv/*’

Un ejemplo de informe de cobertura tiene el siguiente aspecto:

python coverage html

Automatización del proceso de Cobertura

En el apartado anterior, he definido la forma de ejecutar los test y la Generación de los informes de cobertura y, en el presente apartado, realizaré la descripción de cómo lo podemos automatizar.

La automatización la realizamos empleando la herramienta tox(https://tox.readthedocs.io/en/latest/) Para poder utilizar tox, primeramente, es necerios definir en el fichero requirement.txt la herramienta tox; una vez instalado en el entorno virtual, deberemos definir el plan de ejecución de tox el cual se define en el fichero tox.ini ubicado en la carpeta raíz del proyecto.

El aspecto del fichero tox es el siguiente:

[tox]
envlist = py36, coverage-report
skipsdist = True

[testenv]
commands = python -m pytest {posargs}
deps =
-r{toxinidir}/requirements.txt
freezegun==0.3.9
pytest==3.5.0
passenv=*

[testenv:coverage-report]
skip_install = true
commands =
coverage erase
coverage run --omit='.tox/*,.venv/*' -m unittest
coverage report --omit='.tox/*,venv/*' -m
coverage html --omit='.tox/*,venv/*'

El fichero tox está compuesto de tres elementos de configuración los cuáles tienen la siguiente descripción:

  • Elemento tox: Definición del entorno virtual de ejecución, elemento a ejecutar y el flag de generación del artefacto para la distribución
  • Elemento testenv: Definición de la configuración necesaria por tox.
  • Elemento coverage-report: Definición de la secuencia de comandos de la herramienta coverage para el cálculo y generación de informes de cobertura.

Para ejecutar el proceso automático tecleamos en la línea de comando y posicionados en la carpeta de proyecto el comando tox.

Conclusión

El proceso de generación de test unitartios en Python es un proceso parecido a otros lenguajes como Java o Scala. Además, al estár el framework incorporado en la distribución no requiere de ninguna operación de carga, facilitando su uso.

 

AWS Redshift y Luigi, módulo Redshift: operación de consulta

En la presente entrada, AWS Redshift y Luigi, módulo Redshift: operación de consulta , realizaré la descripción de cómo se define una tarea para realizar una consulta o ejecución de una sentencia SQL sobre Amazon Redshift.

Para realizar las operaciones de consulta en Redshift es necesario la implementación de una clase que herede de la clase RedshiftQuery del módulo de Redshift de Luigi. La importación de la clase, se realiza de la siguiente forma:

from luigi.contrib.redshift import RedshiftQuery

La solución que propongo es una jerarquía de clases para poder definir varias consultas. La solución está compuesta por una clase que hereda de RedshiftQuery y la clase que ejecuta una consulta. La clase genérica de consultas con la carga de la configuración es la siguiente:

class AbstractRedshiftQuery(RedshiftQuery):
  """Clase genérica para la realización de consultas en Redshift"""
  config = RedshiftConnection()
  host = config.host
  database = config.database
  user = config.user
  password = config.password
  def output(self):
    return RedshiftTarget(
    host=self.host,
    database=self.database,
    user=self.user,
    password=self.password,
    table=self.table,
    update_id=self.update_id,
    port=5439)

En el snippet anterior, se define la clase AbstractRedshiftQuery que hereda de RedshiftQuery; y, además, se define la clase RedshiftConnection con los datos de configuración de la conexión a Redshift. Los datos de configuración son: host, database, user y password.

Una vez definida la clase AbstractRedshiftQuery, definimos la clase con la consulta. La clase con la sentencia SQL es la siguiente:

class CreateHistorico(AbstractRedshiftQuery):
  """
  La tarea CreateHistorico realiza la creación de una tabla.
  """
  table = "tabla_historia"
  query = """
    CREATE TABLE tabla_historia
    (
     sociedad varchar(60),
     fecha timestamp
    )
  """

En el snippet anterior, se define la clase CreateHistorico que realiza la creación de una tabla en Redshift la cual hereda de la clase AbstractRedshiftQuery. Para realizar la consulta, es necesario definir el atributo query con la sentencia a ejecutar y el atributo table. El resultado es la creación de la tabla tabla_historica en Redshift.

Para su ejecución, se realiza por los procedimientos normales de arranca de Luigi.

En este caso, se ha realizado la creación de una tabla pero, se puede definir cualquier otra sentencia: inserción, borrado,…

Para el lector interesado, las entradas publicadas hasta la fecha son las siguientes:

AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas

En la presente entrada, AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas, realizaré la descripción de la tareas para realizar con Luigi las operaciones de eliminación y copiado de carpetas de un bucket en  Amazon S3 con Luigi. Para realizar las operaciones sobre S3 de Amazon con Luigi, necesitamos instanciar un cliente S3Client dentro de una tarea.

En nuestro caso, definiré una jerarquía de clases en donde la clase padre define la referencia a un cliente S3 de Luigi: de esta forma, las clases hijos simplemente deben de pasar los parámetros con los datos necesarios. La importación del cliente es la siguiente:

from luigi.contrib.s3 import S3Client

La cabecera de la clase con la definición de la referencia al cliente es la siguiente:

class AbstractS3Client(luigi.Task):
  """
  La clase AbstractS3Client realiza la operaciones básicas en S3.
  """
  s3config = S3Configuration()
  s3Client = S3Client(profile_name=s3config.profileName, host=s3config.awsHostS3)
  [...]

Operación de eliminación

La operación de eliminación del contenido de una carpeta de S3 se realiza empleando la función remove del cliente S3Client. Además del path de la carpeta que se quiere eliminar, es necesario el nombre del Bucket de S3.

El snippet con el código con la funcionalidad de la eliminación de la carpeta es la siguiente:

def remove(self, bucketName, path):
  """
  El método removeProcess realiza la eliminación del contenido definido en un path pasado por parámetro de un
  bucket determinado.
  :param bucketName: Nombre del bucket
  :param path: Path
  :return:
  """
  path = "s3://{bucketName}/{path}/".format(bucketName=bucketName, path=path)
  if self.s3Client.exists(path):
    logging.info("\n[AbstractS3Client.removeProcess] Verificado el path={path}".format(path=path))
    resultRemove = self.s3Client.remove(path)
    if resultRemove:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: OK".format(path=path))
    else:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: KO".format(path=path))
  else:
    logging.info("[AbstractS3Client.removeProcess] NO Verificado el path={path}".format(path=path))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta del bucket a eliminar con la variable path; se realiza la comprobación de existencia del bucket que se quiere eliminar con la función exists; si la comprobación anterior es correcta, se realiza la eliminación con la función remove, en otro caso, se muestra un mensaje informativo.

Operación de copia

La operación de copiado del contenido de una carpeta origen a otra carpeta destino, se realiza con la función copy del cliente S3Client. Además, del path de la carpeta origen y destino, es necesario el nombre del bucket de S3.

El snippet con el código con la funcionalidad de la operación de copiado de las carpetas, es el siguiente:

def copy(self, bucketName, sourceFolder, targetFolder):
  """
  El método copyProcess realiza el copiado del contenido de una carpeta origen(sourceFolder) a una carpeta
  destino (targetFolder) de un bucket determinado (bucketName).
  :param bucketName: Nombre del bucket
  :param sourceFolder: Carpeta origen
  :param targetFolder: Carpeta destino.
  :return:
  """
  sourceFile = "s3://{bucketName}/{sourceFolder}/".format(bucketName=bucketName, sourceFolder=sourceFolder)
  if self.s3Client.isdir(sourceFile):
    targetFile = "s3://{bucketName}/{targetFolder}/".format(bucketName=bucketName, targetFolder=targetFolder)
    if self.s3Client.isdir(targetFile):
      resultadoCopiado = self.s3Client.copy(sourceFile, targetFile)
      logging.info("[AbstractS3Client.copy] Copiado contenido DESDE:{sourceFile} A:{targetFile}".format(sourceFile=sourceFile, targetFile=targetFile))
      logging.info("[AbstractS3Client.copy] Numero de ficheros copiados={resultadoCopiado0}".format(resultadoCopiado0=resultadoCopiado[0]))
      logging.info("[AbstractS3Client.copy] Tamaño copiado={resultadoCopiado1}".format(resultadoCopiado1=resultadoCopiado[1]))
    else:
      logging.info("[AbstractS3Client.copy] {targetFile} no es un directorio.".format(targetFile=targetFile))
  else:
    logging.info("[AbstractS3Client.copy] {sourceFile} no es un directorio.".format(sourceFile=sourceFile))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta origen de copiado definido en la variabla sourceFile; se realiza la comprobación de existencia del directorio con la función isDir; si el path del directorio es correcto, se calcula el path de la carpeta destino de copiado definido en la variable targetFile; si el path del directorio destino es correcto empleando la función isDir, se realiza el copiado del contenido de la carpeta origen en la destino; y, de todas las comprobaciones, se muestra el mensaje de error si no se cumple.

Instancia de clases

Para ejecutar la funcionalidad descrita en el apartado anterior, es necesario definir una clase que herede de la clase AbstractS3Client. Al ser esta clase una tarea de Luigi, es necesario la implementación del método run. Si se desea que la tarea esté enlazada con otras tareas, es necesario definir el método output.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar la eliminación de una carpeta es la siguiente:

class DeleteS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.remove(s3Configuration.bucketName, s3Configuration.sourceFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta a eliminar; y, además, define la invocación del método remove para ejecutar la eliminación.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar el copiado del contenido de una carpeta es el siguiente:

class CopyS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.copyProcess(s3Configuration.bucketName, s3Configuration.sourceFolder, s3Configuration.targetFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta origen a copiar; targetFolder, carpeta destino de copiado; y, además, define la invocación del método remove para ejecutar la eliminación.

Las operaciones con Amazon S3 son sencillas de realizar con Luigi. Lo que se tiene que tener claro son todos aquellos datos de configuración para la ejecución de las operaciones.

Para el lector interesado, las entrada publicadas hasta la fecha sobre Luigi son las siguientes:

 

Luigi

En el mundo Big Data y BI, los ETL son unas de las operaciones iniciales para preparar los datos para las tareas de inteligencia; con dichos datos, una vez cargados, se realizan las operaciones para la ayuda en la toma de decisiones. En la presente entrada, «Luigi», defino qué es un ETL y presentar una solución para realizar ETL`s con la solución Luigi.

Definimos un ETL, según la definición en wikipedia, como aquel proceso de extracción, transformación y carga de datos de una fuente de datos a una base da datos, data mart o data warehouse.

La solución tecnológica para la implementación de ETL que describiré en la presente entrada es Luigi  la cual pertenece al contexto del lenguaje Python. Luigi es un paquete de Python el cual permite la construcción de trabajos batch complejos, visualización del estado de procesamiento y encadenamientos de tareas. Luigi es una solución que ha sido adaptada por empresas como Spotify o Red Hat.

El scketchnote de la entrada es el representado en la siguiente imagen:

Instalación

El proceso de instalación de del paquete Luigi es el típico de instalación de un paquete en Python mediante la herramienta pip. Así, el comando de instalación es el siguiente:

pip install luigi

Elementos

Los elementos básicos que intervienen en la definición de un flujo de proceso en Luigi son tres: target, corresponde a una fuente de entrada o salida de datos; un task, tarea de procesamiento; y, parameter, correspondiente a los parámetros de intercambio entre las distintas tareas del flujo de trabajo.

Target

Un Target es aquella clase abstracta que implementa un punto de entrada o salida del flujo; por ejemplo, puede ser: un fichero, una base de datos, un bucket en AWS S3,…

Un Target está compuesta por una jerarquía de clases, como por ejemplo: LocalTarget, S3Target, MySqlTarget, RedshiftTarget; en definitiva, son clases específicas para aquella fuente de datos con las que un flujo trabaja.

Task

Un Task es aquella unidad computacional que realiza una operación con unos datos determinados. Task es una clase abstracta cuyas clases hijas deben de implementar unos métodos básicos: require, para definir las dependencias con otras tareas; output, para definir el resultado del procesamiento; y, por último, run, para definir el procesamiento propiamente dicho.

Parameter

Un Parameter son aquellos elementos de entrada de una tarea. Un Parameter, de la misma forma que los elementos anteriores, es una clase abstracta.

Ejemplos

Operación Suma

La operación de suma consiste en definir una tarea que recibiendo dos parámetros enteros como entrada realice la suma de los mismos. Para realizarlo, definiremos un fichero Python tasksuma.py con el siguiente contenido:

import luigi
class Sumatorio(luigi.Task):
  x = luigi.IntParameter(default=10)
  y = luigi.IntParameter(default=45)

 def run(self):
     print("[***]")
     print("[***] Resultado de la suma:", self.x + self.y)
     print("[***]")

El contenido consiste en la definición de una clase Sumatorio que hereda de la clase Task. La clase Sumatorio recibe dos parámetros enteros; dichos parámetros, tienen unos valores por defecto en el caso de que no existan los valores de entrada. El método run realiza la operación de suma y la escritura por consola.

Para la ejecución de la tarea, en la carpeta raíz del proyecto Python, ejecutamos el siguiente comando:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --x 69 --y 10 --local-scheduler

La salida por consola contiene el resultado de la ejecución de la tarea y las trazas del procesamiento. Así, sin entrar en detalle de todos los mensajes, la salida es la siguiente:

INFO: Informed scheduler that task Sumatorio_69_10_0543bfea4e has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) running Sumatorio(x=69, y=10)
[***]
[***] Resultado de la suma: 79
[***]
INFO: [pid 8786] Worker Worker(salt=307314540, workers=1, host=INV01830, username=alvaromonsalve, pid=8786) done Sumatorio(x=69, y=10)
[...]
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
 - 1 Sumatorio(x=69, y=10)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Si no pasamos los valores enteros por parámetro a la tarea, el comando de ejecución sería el siguiente:

PYTHONPATH='.' luigi --module tasksuma Sumatorio --local-scheduler

La salida por consola sería parecida a la anterior, residiendo la diferencia en los valores numéricos de los campos que serían los valores por defecto.

Operaciones anidadas

Todo proceso de transformación requiere la realización de varias actividades y, en muchas ocasiones, esas actividades tienen que ser realizadas por tareas encadenadas. En el siguiente ejemplo, se realizan dos tareas encadenadas definidas en el fichero Python task_anidadas.py

La primera tarea, clase GenerateWords, realiza la creación de un fichero palabras.txt con el contenido de una lista de palabras; y, la segunda tarea, clase CountLetters, realiza la lectura del fichero generado en la tarea anterior, contabiliza la lectura longitud de las palabras y, como resultado, crea un fichero con la palabra y su longitud.

La clase GenerateWords se define de la siguiente forma:

class GenerateWords(luigi.Task):
 def output(self):
    return luigi.LocalTarget('/tmp/LuigiEjem1/palabras.txt')

 def run(self):
    palabras = ['manzana', 'plátano', 'mandarina']
    with self.output().open('w') as f:
      for palabra in palabras:
      f.write('{word}\n'.format(word=palabra))

La clase GenerateWords está compuesta de dos métodos: el método output, en el cual se define el target de salida, es decir, se define el fichero de salida del procesamiento; y, en el método run, se realiza la escritura en el fichero definido en el método output el contenido de la lista palabras.

La clase CountLetters se define de la siguiente forma:

class CountLetters(luigi.Task):
 def requires(self):
   return GenerateWords()

 def output(self):
   return luigi.LocalTarget('/tmp/LuigiEjem1/contador_letras.txt')

 def run(self):
   print("[***]")
   print("[***]self.input().open('r')=", self.input().open('r'))
   print("[***]")
   with self.input().open('r') as ficheroEntrada:
   palabras = ficheroEntrada.read().splitlines()
   with self.output().open('w') as ficheroSalida:
      for palabra in palabras:
      ficheroSalida.write(
       '{word} | {letter_count}\n'.format(
       word=palabra,
       letter_count=len(palabra)
      )
     )

La clase CountLetters está compuesta de tres métodos: requires, método en el que se define la dependencia con la tarea definida en la clase GenerateWords; método output, método en el que se define el target de salida, es decir, el fichero contador_letras; y, finalmente, método run, método que realiza la lectura del fichero generado por la tarea GenerateWord, contabilización de las longitudes de las palabras existentes y escritura en el fichero de salida contador_letras.txt

Para la ejecución del proceso definido por las anteriores tareas, se ejecuta el siguiente comando en la carpeta raíz del proyecto:

PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler

El resultado es la creación en la carpeta ‘/tmp/LuigiEjem1’ de dos ficheros: palabras.txt y contador_letras.txt. El contenido del fichero palabras.txt es el siguiente:

manzana
plátano
mandarina

El contenido del fichero contador_letras.txt es el siguiente:

manzana | 7
plátano | 7
mandarina | 9

4.- Visualización

Para la visualización del estado de ejecución, Luigi posee una herramienta gráfica en la cual se muestra la información del estado de ejecución. Esta herramienta de visualización es un servidor que hay que arrancar; para ello, se emplea el siguiente comando:

luigid --background --port=8082 --logdir=logs

El aspecto del interfaz es el siguiente:

La información visual que muestra en el GUI es relacionada con las tareas que son ejecutas, el estado en el que se encuentra cada una de ellas y las dependencias gráficas entre cada una.

Para que el servidor monitorice el estado de ejecución de cada tarea, es necesario que en el arranque se especifique que el servidor controlará su ejecución. Así, para poderlo realizar y basándonos en los ejemplos anteriores, es necesario eliminar el flag –local-scheduler. Desde un punto de vista práctico, los escenarios son los siguientes:

  • Ejecución en local de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters --local-scheduler
  • Ejecución en el servidor Luigi de la tarea CountLetters, se realiza mediante el siguiente comando:
PYTHONPATH='.' luigi --module etl.task_anidadas CountLetters

Conclusiones

Los ETL se basan en procesos de datos simples: lectura de fuente de datos, transformación y escritura en un repositorio de datos destinos. Dichos procesos, en función de la complejidad del negocio, su nivel de dificultad puede variar.

Luigi es una solución del ecosistema de soluciones del lenguaje Python; pero, mientras que aprendía Luigi, me he acortado por el parecido funcionamiento a SpringBatch, solución propuesta por Spring Framework en el ecosistema Java. Para el lector interesado, publiqué hace tiempo unas entradas sobre SpringBatch

Desde mi punto de vista, la dificultad no reside en la solución tecnológica, la dificultad reside en determinar qué datos son los que se tienen que cargar y qué transformaciones se tienen que aplicar a los datos.