AWS Lambda en Scala. Operaciones con AWS S3

Las tres grandes soluciones utilizadas en el mundo empresarial para definir sistemas cloud son Amazon AWS, Microsoft Azure y Google Cloud. Las tres soluciones permiten la posibilidad de desarrollar arquitecturas serverless la cual se implementan con funciones lambda. En la presente entrada, AWS Lambda en Scala. Operaciones con AWS S3, describiré cómo definir una función lambda en Amazon AWS.

 

 

 

 

 

Definimos arquitectura Serverless como aquella arquitectura que define sistemas con aplicaciones y servicios, con capacidad de ejecución, así como, la posibilidad de crear nuevas aplicaciones y servicios, sin necesidad de administrar infraestructura.

Definimos una función Lambda de AWS como “un un servicio informático que permite ejecutar código sin aprovisionar ni administrar servidores. AWS Lambda ejecuta el código sólo cuando es necesario, y se escala de manera automática, pasando de pocas solicitudes al día a miles por segundo”.

Las funciones Lambda pueden ser definidas en diferentes lenguajes como pueden ser: Java, Python, Node, Scala,… en las diferentes plataformas. Dada la diversidad de plataformas y lenguajes, las soluciones son amplias y diversas. Para unificar funcionalidad ante las plataformas y lenguajes existen frameworks que ofrecen operaciones para simplificar la tarea al desarrollador. Un ejemplo de este tipo de tecnología es el framework Serverless.

El framework Serverless es una herramienta Open Source la cual permite el desarrollo y despliegue de aplicaciones serverless en AWS, Azure, Google y otras más.

Instalación en entorno Linux/Mac

La instalación del framework en un entorno Linux o Mac es muy sencilla, simplemente es necesario ejecutar el siguiente comando desde la línea de comando:

curl -o- -L https://slss.io/install | bash

Para la verificación de la instalación, se ejecuta el siguiente comando:

serverless -h

El resultado del comando anterior deberá de mostrar la información de los comandos del framework.

Descripción funcional de la función de ejemplo

Definiremos una función que opere sobre la solución cloud de Amazon. Desde un punto de vista funcional, la función es sencilla, realizará ciertas operaciones con el servicio S3 de AWS descritas en el siguiente listado:

  • Listado de los bucket existentes.
  • Creación de un bucket en S3.
  • Subida de un fichero a S3.
  • Descarga de un fichero a S3.

Creación de la función con Serverless

Para realizar la creación de una función, utilizaremos el comando create del framework Serverless; para ello, en la consola del sistema, crearemos una carpeta (por ejemplo: serverless-scala-aws-s3) y ejecutaremos el comando create de serverless. El snippet copn los comandos son los siguientes:

cd serverless-scala-aws-s3
serverless create --template aws-scala-sbt --path lambda-s3

El comando create emplea la plantilla para un proyecto en Scala con sbt y define el path a la función. Además de la plantilla del lenguaje Scala, se pueden definir funciones en otros lenguajes como Python, Java, kotlin, Go,…

La vista de la estructura creada desde un IDE es el siguiente:

Los componentes del proyecto son los siguientes:

  • build.sbt Fichero sbt para la gestión del ciclo de vida del código de la función. Al tener que operar con S3 se debe de definir la dependencia de la librería AWScala en la referencia libraryDependencies. Las librerías utilizadas en este proyecto son las siguientes:
libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-lambda-java-events" % "2.2.7",
  "com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
  "com.amazonaws" % "aws-lambda-java-log4j2" % "1.1.0",
  "com.github.seratch" %% "awscala" % "0.8.+"
)
  • Componentes de Scala. La plantilla del framework crea automáticamente cuatro componentes, siendo el más importante el Handler de la función.
    • Handler.- El componente handler define dos clases: Handler, para la función lambda a desarrollar; y, ApiGatewayHandler, para definir la clase para el servicio API Gateway; en nuestro caso, nos centraremos en la clase Handler.La clase Handler define un método handleRequest en el cual desarrollaremos la funcionalidad del ejemplo.
import scala.jdk.CollectionConverters._
class Handler extends RequestHandler[Request, Response] {
  val logger: Logger = LogManager.getLogger(getClass)
  def handleRequest(input: Request, context: Context): Response = {
    implicit val region = Region.US_EAST_1
    implicit val s3 = S3()
    val buckets: Seq[Bucket] = s3.buckets
    logger.info(s"\n1 buckets: $buckets \n")
    val bucket: Bucket = s3.createBucket("prueba2fromlambdafunction")
    logger.info(s"\n2 bucket: $bucket \n")
    // Upload operation of the file example1-file.txt with name example1-uploaded-file.txt
    bucket.put("example1-uploaded-file.txt", new java.io.File("example1-file.txt"))
    val s3obj: Option[S3Object] = bucket.getObject("example1-uploaded-file.txt")
    logger.info(s"\n3 Uploaded: ${s3obj.getOrElse("Empty")} \n")
    logger.info(s"Received a request: $input")
    Response("Go Serverless v1.0!!!!! Your function executed successfully!!", input)
  }
}
    • Request.- Define la clase Request con los parámetros del evento de entrada.
import scala.beans.BeanProperty
class Request(@BeanProperty var key1: String, @BeanProperty var key2: String, @BeanProperty var key3: String) {
  def this() = this("", "", "")
  override def toString: String = s"Request($key1, $key2, $key3)"
}
    • Response.- Define la clase respuesta del tipo de retorno del Handler.
import scala.beans.BeanProperty
case class Response(@BeanProperty message: String, @BeanProperty request: Request)
    • ApiGatewayResponse.- Define la clase de respuesta para el caso de APIGateway.
case class ApiGatewayResponse(@BeanProperty statusCode: Integer, @BeanProperty body: String,
@BeanProperty headers: java.util.Map[String, String], @BeanProperty base64Encoded: Boolean = false)
  • Serverless.yml. El fichero serverless.yml es aquel lugar donde se configura la función para que sea desplegada en AWS. El fichero está compuesto por varias secciones en donde se define las variables, la función, o bien, aquellos recursos necesarios de AWS. Este fichero es el que empleará el framework Serverless para definir la plantilla de CloudFormation para su despliegue en AWS. La secciones son:
    • Service.- Definición del nombre del servicio de la función en AWS.
    • Provider.- definición de las variables internas a AWS.
    • Custom.- Definición de las variables específicas para la función como por ejemplo: nombre del proyecto, región,… proporcionada por los valores definidos en Provider, o bien, desde la línea de comando.
    • Environment.- Definición de las variables de entorno globales.
    • Package.- configuración del paquete a crear para realizar la subida a AWS. Se puede definir qué ficheros incluir o excluir, o bien, el nombre del jar con el que se trabaja.
    • Functions.- definición de la función Scala, definición de la referencia del rol, variables de entorno,…
    • Resources.- definición de los recursos empleados por la función en AWS; en nuestro caso, definición del role y las políticas de seguridad.

El contenido del fichero es el siguiente:

service: lambda-s3
provider:
  name: aws
  project: scalaproject
  runtime: java8
  stage: ${opt:stage, 'dev'}
  region: us-east-1
  timeout: 900
  iamRoleStatements:
    - Effect: Allow
      Action:
        - s3:GetObject
        - s3:PutObject
      Resource:
        - "arn:aws:s3:::prueba2fromlambdafunction/*"
custom:
  currentStage: ${opt:stage, self:provider.stage}
  currentProject: ${self:provider.project}
  currentRegion: ${opt:region, self:provider.region}
environment:

package:
  individually: true
  artifact: target/scala-2.13/lambda-s3.jar

functions:
  lambda-s3:
    handler: app.Handler
    role: LambdaRole
    environment:
      ENV: ${self:custom.currentStage}
resources:
  Resources:
    LambdaRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: ${self:custom.currentProject}-lambda-s3-${self:custom.currentStage}
          AssumeRolePolicyDocument:
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action: sts:AssumeRole
        Policies:
          - PolicyName: ${self:custom.currentProject}-lambda-s3-${self:custom.currentStage}
            PolicyDocument:
              Statement:
                - Effect: Allow
                  Action:
                    - logs:CreateLogGroup
                    - logs:CreateLogStream
                    - logs:PutLogEvents
                    - s3:*
                  # - ec2:DescribeNetworkInterfaces
                  # - ec2:CreateNetworkInterface
                  # - ec2:DeleteNetworkInterface
                  # - ec2:DescribeInstances
                  # - ec2:AttachNetworkInterface
                 Resource: "*"

De snippet anterior resaltar las líneas comentadas en la definición de los permisos; éstas líneas, corresponden a los permisos que se deben de añadir si se desea que la función Lambda se ejecute en una subred de una VPC determinada.

Ciclo de vida

Configuración del profile de AWS. Para trabajar con AWS es necesario instalar el cliente de AWS y definir las credenciales del usuario para poder realizar las operaciones de despliegue en la cuenta de Amazon.

  • Creación del artefacto. Para realizar el despliegue, es necesario construir el artefacto con los componentes Scala y su ensamblado con las librerías necesarias. El comando SBT a ejecutar en la carpeta de la función es el siguiente:
sbt assembly
  • Despliegue de la función en Amazon AWS. El proceso de despliegue consiste en crear o modificar los recursos en AWS o el código de la función utilizando el stack de cloudformation asociado al fichero serverless.yml. El comando a ejecutar en la carpeta de la función es el siguiente:
serverless deploy -r us-east-1
  • Eliminación de la función. Si se desea eliminar la función se puede eliminar la función y sus recursos asociados con el siguiente:
serverless remove

Librería AWScala

La librería AWScala es aquella librería que permite realizar las operaciones con S3 u otros servicios de AWS. En nuestro caso, nos centraremos en definir las operaciones en S3.

  • Instancia del cliente S3. La creación de un cliente para realizar operaciones con S3 se realiza creando un componente de tipo S3. Dado que la función tiene asociado un role con los permisos de acceso, no es necesario asignar las credenciales. El snippet de ejemplo es el siguiente:
import awscala._, s3._
import awscala.s3._
import awscala.Region
implicit val region = Region.US_EAST_1
implicit val s3 = S3()
  • Listado de los buckets existentes. Una vez creado el cliente, se realiza la conexión al servicio S3 y, con la función buckets, obtenemos una lista con los bucket existentes. El snippet de ejemplo es el siguiente:
val buckets: Seq[Bucket] = s3.buckets
  • Creación de un bucket. De la misma manera que el caso anterior, el cliente S3 tiene una función de creación de bucket cuyo nombre es createBucket al cual se le pasa un nombre único del bucket a crear. El snippet de ejemplo es el siguiente:
val bucket: Bucket = s3.createBucket("prueba2fromlambdafunction")
  • Subir un fichero a S3. Para subir un fichero a S3, el cliente S3 emplea la función put al cual, como primer parámetro, se le pasa el nombre que tendrá en el bucket; y, como segundo parámetro, se le pasa un objeto File con la referencia del fichero. En nuestro caso, existe un fichero de texto example1-file.txt en el proyecto. El snippet de ejemplo es el siguiente:
bucket.put("example1-uploaded-file.txt", new java.io.File("example1-file.txt"))
  • Descarga de un fichero de S3. Para descargar un fichero de S3, el cliente emplea la función getObject a la cual se le pasa como parámetro el nombre del elemento a descargar. El snippet de ejemplo es el siguiente:

val s3obj: Option[S3Object] = bucket.getObject(“example1-uploaded-file.txt”)

Ejecución

Una vez desplegada la función con el comando de serverless deploy, hay que entrar en la consola de AWS y navegaremos hasta la consola de funciones lambda;y,
una vez en la consola, tendremos la referencia a la función. Para las pruebas, crearemos un evento con unos datos de pruebas como los siguientes:

{
"key1": "value1",
"key2": "value2",
"key3": "value3"
}

Para ejecutar la función, pulsaremos el botón de Test en la parte superior derecha; tras la ejecución, se reportará el resultado de la función y se crearán en el cloudwatch las trazas de la función. El aspecto de la consola de AWS con la información de la función es el siguiente:

El resultado de la función en S3 es la creación de un fichero en el bucket prueba2fromlambdafunction. La vista de la consola S3 tras la ejecución de la función es la siguiente:

 

Si el lector está interesado en el código puede acceder al siguiente repositorio de GitHub.

Conclusiones

La selección del lenguaje con el que se opera con AWS depende del equipo de desarrollo ya que, en función del conocimiento de los posibles lenguajes, se seleccionará uno u otro. Desde mi experiencia en los equipos en lod que he trabajado, siempre se ha elegido el lenguaje Python por su sencillez de uso utilizando la librería boto3. Con el presente ejemplo, quiero poner de manifiesto la sencillez con el lenguaje Scala y, dado que estamos construyendo funciones lambda sin infraestructura, utilizar un lenguaje con paradigma funcional permite construir componentes software orientados a la funcionalidad a desarrollar.

AWS Redshift y Luigi, módulo Redshift: operación de consulta

En la presente entrada, AWS Redshift y Luigi, módulo Redshift: operación de consulta , realizaré la descripción de cómo se define una tarea para realizar una consulta o ejecución de una sentencia SQL sobre Amazon Redshift.

Para realizar las operaciones de consulta en Redshift es necesario la implementación de una clase que herede de la clase RedshiftQuery del módulo de Redshift de Luigi. La importación de la clase, se realiza de la siguiente forma:

from luigi.contrib.redshift import RedshiftQuery

La solución que propongo es una jerarquía de clases para poder definir varias consultas. La solución está compuesta por una clase que hereda de RedshiftQuery y la clase que ejecuta una consulta. La clase genérica de consultas con la carga de la configuración es la siguiente:

class AbstractRedshiftQuery(RedshiftQuery):
  """Clase genérica para la realización de consultas en Redshift"""
  config = RedshiftConnection()
  host = config.host
  database = config.database
  user = config.user
  password = config.password
  def output(self):
    return RedshiftTarget(
    host=self.host,
    database=self.database,
    user=self.user,
    password=self.password,
    table=self.table,
    update_id=self.update_id,
    port=5439)

En el snippet anterior, se define la clase AbstractRedshiftQuery que hereda de RedshiftQuery; y, además, se define la clase RedshiftConnection con los datos de configuración de la conexión a Redshift. Los datos de configuración son: host, database, user y password.

Una vez definida la clase AbstractRedshiftQuery, definimos la clase con la consulta. La clase con la sentencia SQL es la siguiente:

class CreateHistorico(AbstractRedshiftQuery):
  """
  La tarea CreateHistorico realiza la creación de una tabla.
  """
  table = "tabla_historia"
  query = """
    CREATE TABLE tabla_historia
    (
     sociedad varchar(60),
     fecha timestamp
    )
  """

En el snippet anterior, se define la clase CreateHistorico que realiza la creación de una tabla en Redshift la cual hereda de la clase AbstractRedshiftQuery. Para realizar la consulta, es necesario definir el atributo query con la sentencia a ejecutar y el atributo table. El resultado es la creación de la tabla tabla_historica en Redshift.

Para su ejecución, se realiza por los procedimientos normales de arranca de Luigi.

En este caso, se ha realizado la creación de una tabla pero, se puede definir cualquier otra sentencia: inserción, borrado,…

Para el lector interesado, las entradas publicadas hasta la fecha son las siguientes:

AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas

En la presente entrada, AWS S3 y Luigi, módulo S3: eliminación y copiado de carpetas, realizaré la descripción de la tareas para realizar con Luigi las operaciones de eliminación y copiado de carpetas de un bucket en  Amazon S3 con Luigi. Para realizar las operaciones sobre S3 de Amazon con Luigi, necesitamos instanciar un cliente S3Client dentro de una tarea.

En nuestro caso, definiré una jerarquía de clases en donde la clase padre define la referencia a un cliente S3 de Luigi: de esta forma, las clases hijos simplemente deben de pasar los parámetros con los datos necesarios. La importación del cliente es la siguiente:

from luigi.contrib.s3 import S3Client

La cabecera de la clase con la definición de la referencia al cliente es la siguiente:

class AbstractS3Client(luigi.Task):
  """
  La clase AbstractS3Client realiza la operaciones básicas en S3.
  """
  s3config = S3Configuration()
  s3Client = S3Client(profile_name=s3config.profileName, host=s3config.awsHostS3)
  [...]

Operación de eliminación

La operación de eliminación del contenido de una carpeta de S3 se realiza empleando la función remove del cliente S3Client. Además del path de la carpeta que se quiere eliminar, es necesario el nombre del Bucket de S3.

El snippet con el código con la funcionalidad de la eliminación de la carpeta es la siguiente:

def remove(self, bucketName, path):
  """
  El método removeProcess realiza la eliminación del contenido definido en un path pasado por parámetro de un
  bucket determinado.
  :param bucketName: Nombre del bucket
  :param path: Path
  :return:
  """
  path = "s3://{bucketName}/{path}/".format(bucketName=bucketName, path=path)
  if self.s3Client.exists(path):
    logging.info("\n[AbstractS3Client.removeProcess] Verificado el path={path}".format(path=path))
    resultRemove = self.s3Client.remove(path)
    if resultRemove:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: OK".format(path=path))
    else:
      logging.info("[AbstractS3Client.removeProcess] Eliminación de {path}: KO".format(path=path))
  else:
    logging.info("[AbstractS3Client.removeProcess] NO Verificado el path={path}".format(path=path))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta del bucket a eliminar con la variable path; se realiza la comprobación de existencia del bucket que se quiere eliminar con la función exists; si la comprobación anterior es correcta, se realiza la eliminación con la función remove, en otro caso, se muestra un mensaje informativo.

Operación de copia

La operación de copiado del contenido de una carpeta origen a otra carpeta destino, se realiza con la función copy del cliente S3Client. Además, del path de la carpeta origen y destino, es necesario el nombre del bucket de S3.

El snippet con el código con la funcionalidad de la operación de copiado de las carpetas, es el siguiente:

def copy(self, bucketName, sourceFolder, targetFolder):
  """
  El método copyProcess realiza el copiado del contenido de una carpeta origen(sourceFolder) a una carpeta
  destino (targetFolder) de un bucket determinado (bucketName).
  :param bucketName: Nombre del bucket
  :param sourceFolder: Carpeta origen
  :param targetFolder: Carpeta destino.
  :return:
  """
  sourceFile = "s3://{bucketName}/{sourceFolder}/".format(bucketName=bucketName, sourceFolder=sourceFolder)
  if self.s3Client.isdir(sourceFile):
    targetFile = "s3://{bucketName}/{targetFolder}/".format(bucketName=bucketName, targetFolder=targetFolder)
    if self.s3Client.isdir(targetFile):
      resultadoCopiado = self.s3Client.copy(sourceFile, targetFile)
      logging.info("[AbstractS3Client.copy] Copiado contenido DESDE:{sourceFile} A:{targetFile}".format(sourceFile=sourceFile, targetFile=targetFile))
      logging.info("[AbstractS3Client.copy] Numero de ficheros copiados={resultadoCopiado0}".format(resultadoCopiado0=resultadoCopiado[0]))
      logging.info("[AbstractS3Client.copy] Tamaño copiado={resultadoCopiado1}".format(resultadoCopiado1=resultadoCopiado[1]))
    else:
      logging.info("[AbstractS3Client.copy] {targetFile} no es un directorio.".format(targetFile=targetFile))
  else:
    logging.info("[AbstractS3Client.copy] {sourceFile} no es un directorio.".format(sourceFile=sourceFile))

En el snippet anterior, código que se integra en la clase AbstractS3Client, define el path de la carpeta origen de copiado definido en la variabla sourceFile; se realiza la comprobación de existencia del directorio con la función isDir; si el path del directorio es correcto, se calcula el path de la carpeta destino de copiado definido en la variable targetFile; si el path del directorio destino es correcto empleando la función isDir, se realiza el copiado del contenido de la carpeta origen en la destino; y, de todas las comprobaciones, se muestra el mensaje de error si no se cumple.

Instancia de clases

Para ejecutar la funcionalidad descrita en el apartado anterior, es necesario definir una clase que herede de la clase AbstractS3Client. Al ser esta clase una tarea de Luigi, es necesario la implementación del método run. Si se desea que la tarea esté enlazada con otras tareas, es necesario definir el método output.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar la eliminación de una carpeta es la siguiente:

class DeleteS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.remove(s3Configuration.bucketName, s3Configuration.sourceFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta a eliminar; y, además, define la invocación del método remove para ejecutar la eliminación.

El snippet de una clase que hereda de la clase AbstractS3Client para realizar el copiado del contenido de una carpeta es el siguiente:

class CopyS3(AbstractS3Client):
  def run(self):
    s3Configuration = S3Configuration()
    self.copyProcess(s3Configuration.bucketName, s3Configuration.sourceFolder, s3Configuration.targetFolder)

El snippet anterior, define la clase S3Configuration la cual define las siguientes propiedades: bucketName, nombre del bucket; sourceFolder, nombre de la carpeta origen a copiar; targetFolder, carpeta destino de copiado; y, además, define la invocación del método remove para ejecutar la eliminación.

Las operaciones con Amazon S3 son sencillas de realizar con Luigi. Lo que se tiene que tener claro son todos aquellos datos de configuración para la ejecución de las operaciones.

Para el lector interesado, las entrada publicadas hasta la fecha sobre Luigi son las siguientes: