HTTP4S: Introducción

La primera solución para implementar servicios o microservicios en Scala es utilizar AKKA HTTP; pero, con las prestaciones de librerías como Cats, ha permitido la creación de librerías como HTTP4S. En la presente entrada, HTTP4s: Introducción, realizaré una breve descripción para la creación de servicios con la librería HTTP4S.

Definición de dependencias

La definición de dependencias y la gestión del ciclo de desarrollo del software se realizará con sbt. La versión de Scala a utilizar es la versión 2.13.4. Las librería utilizadas son las siguientes:

  • http4s_%.- librerías específicas de http4s.
  • munit_%.- librerías específicas para la realización de test.
  • circe_%.- librerías específicas para el tratamiento de componentes JSON.

El snippet del código con la definición de un módulo que utilice la librería HTTP4s es el siguiente:

val munit = "0.7.20"
val munit_cats_effect_version = "0.12.0"
val http4s = "0.21.14"
val circe = "0.13.0"
[…]
lazy val munit = "org.scalameta" %% "munit" % Versions.munit % Test
lazy val munit_cats_effect_2 = "org.typelevel" %% "munit-cats-effect-2" % Versions.munit_cats_effect_version % Test
lazy val http4s_blaze_server = "org.http4s" %% "http4s-blaze-server" % Versions.http4s
lazy val http4s_blaze_client = "org.http4s" %% "http4s-blaze-client" % Versions.http4s
lazy val http4s_dsl = "org.http4s" %% "http4s-dsl" % Versions.http4s
lazy val http4s_circe = "org.http4s" %% "http4s-circe" % Versions.http4s
lazy val circe_generic = "io.circe" %% "circe-generic" % Versions.circe
lazy val circe_literal = "io.circe" %% "circe-literal" % Versions.circe
[…]
lazy val http4s = (project in file("http4s"))
 .settings(
    name := "example-http4s",
    assemblySettings,
    scalacOptions ++= basicScalacOptions,
    testFrameworks += new TestFramework("munit.Framework"),
    libraryDependencies ++=
    http4sDependencies ++ Seq(
      scalaTest,
      munit,
      munit_cats_effect_2
    )
  )
  lazy val http4sDependencies = Seq(
     http4s_blaze_server,
     http4s_blaze_client,
     http4s_circe,
     http4s_dsl,
     circe_generic,
     circe_literal
   )

Definición de Servicios

No me centrará en definir qué es un micro-servicio o servicio ya que ha sido definido y descrito en otras entradas. Desde el punto de vista de la librería HTTP4s, un servicio es aquella definición de un método HTTP el cual, dado un path determinado, recibe una petición HTTP, realiza una funcionalidad determinada y, como resultado, retorna una respuesta HTTP.

HTTP4s está basada en la librería Cats y, al definir operaciones con efecto de lado, emplearemos como tipo parametrizado el componente IO.

El servicio lo definieremos con la función of del componente HttpRoutes, definiremos el tratamiento a realizar en función del método HTTP junto al path del recurso; para cada método, se realizará la creación de una respuesta con el componente Ok.

El servicio es definido y utilizado en un objeto de tipo aplicación de entrada salida IOApp el cual, con la definición de la función run, arranca la aplicación y levanta el servicio. Esta clase es propia de la librería Cats. La función run realiza la creación de un server con el objeto de tipo BlazeServerBuilder al cual se le configura con el servicio, el puerto y el host deseado.

En los siguiente apartados, se definen ejemplos de definición de servicios con HTTP4s.

Ejemplo de servicio básico

Ejemplo básico de tipo «Hello World» donde se define un servicio HTTP con método GET. El código de ejemplo es el siguiente:

import cats.effect._
import org.http4s._
import org.http4s.dsl.io._
import scala.concurrent.ExecutionContext.Implicits.global
import org.http4s.server.blaze._
import org.http4s.implicits._
object ServiceEjem1 extends IOApp {

  val helloWorldService = HttpRoutes
     .of[IO] { 
        case GET -> Root / "hello" / name => Ok(s"Hello, $name.")
     }.orNotFound

  def run(args: List[String]): IO[ExitCode] =
     BlazeServerBuilderIO
       .bindHttp(8080, "localhost")
       .withHttpApp(helloWorldService)
       .serve
       .compile
       .drain
       .as(ExitCode.Success)
}

Para probar el servicio anterior desde la línea de comando, se utiliza el siguiente comando curl:

curl http://localhost:8080/hello/Pete

Test

Para realizar el test del ejemplo anterior, usaremos el framework MUnit. La definición del test consiste en la declaración de una clase que herede de munit.FunSuite, en nuestro caso, la clase ServiceEjem1Test; ésta, define los test de prueba con la función test y, la verificación de resultado, se realiza con las funciones assert.

Dado que el servicio creado se implementa como una función Kleisli, no es necesario arrancar un server para realizar pruebas, con lo cual, el proceso de pruebas es un proceso de test de una función, simplificando dichas tareas.

La clase de test del servicio anterior es la siguiente:

import cats.effect._
import org.http4s._
import org.http4s.implicits._
class ServiceEjem1Test extends munit.FunSuite {

  test("Test ServiceEjem1 OK") {
     val service = ServiceEjem1.helloWorldService
     val requestTest = Request[IO](Method.GET, uri"/hello/test")
     val result = service.run(requestTest).unsafeRunSync()
     assertEquals(result.status, Status.Ok)
  }

  test("Test ServiceEjem1 KO") {
     val service = ServiceEjem1.helloWorldService
     val requestTest = Request[IO](Method.GET, uri"/hello")
     val result = service.run(requestTest).unsafeRunSync()
     assertEquals(result.status, Status.NotFound)
  }

}

La prueba consiste en la creación de una referencia al servicio, definido con el objeto service; definición de la petición HTTP, definido en el objeto requestTest; y, para su ejecución, emplearemos la función run() y unsafeRunSync(); el resultado, se recoge en el objeto result el cual es utilizado en las funciones assert.

Composición de servicios

Un servicio está implementado como una función Kleisli y, la composición de funciones Kleisli, se realiza combinado dichas funciones. Para ello, utilizamos la función <+> la cual está definida en la sintaxis de la librería cuyo paquete es cats.syntax.all._.

En el código ejemplo, se define un servicio básico tipo «Hola Mundo» en el objeto helloWorldService y se define un servicio tweeService. El servicio tweetService define dos métodos para dos endpoints diferentes. La composición se define en el objeto services con la función <+>.

Una vez definido la composición del servicio, se crea el enrutado de los servicios en el objeto httpApp; este objeto, será utilizada para levantar la aplicación en la función run.

El código de ejemplo es el siguiente:

object ServiceEjem2 extends IOApp {
   val helloWorldService = HttpRoutes.of[IO] { 
     case GET -> Root / "hello" / name => Ok(s"Hello, $name.")
   }
   case class Tweet(id: Int, message: String)
   […]

   def getTweet(tweetId: Int): IO[Tweet] = IO(Tweet(id = 1, message = "Tweet1"))

   def getPopularTweets(): IO[Seq[Tweet]] = IO(
      List(Tweet(id = 2, message = "Tweet2"), Tweet(id = 3, message = "Tweet3"))
   )

   val tweetService = HttpRoutes.of[IO] {
     case GET -> Root / "tweets" / "popular" =>
        getPopularTweets().flatMap(Ok()) 
     case GET -> Root / "tweets" / IntVar(tweetId) =>  
        getTweet(tweetId).flatMap(Ok())
   }
   val services = tweetService <+> helloWorldService

   val httpApp = Router("/" -> helloWorldService, "/api" -> services).orNotFound

   def run(args: List[String]): IO[ExitCode] =
      BlazeServerBuilder[IO](global)
        .bindHttp(7676, "localhost")
        .withHttpApp(httpApp)
        .serve
        .compile
        .drain
        .as(ExitCode.Success)
}

Test

La definición de los test se realiza con el mismo criterio del primer ejemplo. Para simplificar, me centraré los test del servicio compuesto y en la aplicación.

La estrategia de prueba sigue la misma secuencia, definición del servicio, definición de la petición de pruebas, ejecución y valoración de resultados.

El snippet de prueba es el siguiente:

[…]
test("Test ServiceEjem2.service (compose service) OK") {
   val service = ServiceEjem2.services
   val requestTestHello = Request[IO](Method.GET, uri"/hello/test")
   val resultHello = service.orNotFound(requestTestHello).unsafeRunSync()
   assertEquals(resultHello.status, Status.Ok)
   val requestTestTweet = Request[IO](Method.GET, uri"/tweets/popular")
   val resultTweet = service.orNotFound(requestTestTweet).unsafeRunSync()
   assertEquals(resultTweet.status, Status.Ok)
}
test("Test httpApp OK") {
   val httpApp = ServiceEjem2.httpApp
   val requestTestHello = Request[IO](Method.GET, uri"/hello/test")
   val resultHello = httpApp.run(requestTestHello).unsafeRunSync()
   assertEquals(resultHello.status, Status.Ok)
   val requestTestTweet = Request[IO](Method.GET, uri"/api/tweets/popular")
   val resultTweet = httpApp.run(requestTestTweet).unsafeRunSync()
   assertEquals(resultTweet.status, Status.Ok)
}
[…]

Definición de Middleware

En HTTP4S un middleware es un envoltorio de un servicio para poder manipular la petición enviada, o bien, la respuesta. La estrategia de definición de un middleware puede ser empleando una función, o bien, utilizando un objeto. En los siguientes apartados se muestran unos ejemplos prácticos.

Definición de un middleware mediante una función.

La forma básica para definir un middleware es empleando una función la cual recibe como parámetro un servicio y, como resultado, retorna una función kleisli representado en un HttpRoutes. El middleware ejecutará el servicio el cual, en función del resultado, modificará las cabeceras del objeto de respuesta. El resto de elementos sigue la misma estructura y función que los ejemplos de servicios.

El snippet del código es el siguiente:

object MiddlewareEjem1 extends IOApp {
   def myMiddle(service: HttpRoutes[IO], header: Header): HttpRoutes[IO] = Kleisli { (req: Request[IO]) =>
      service(req).map {
         case Status.Successful(resp) =>
             resp.putHeaders(header)
         case resp =>
             resp
      }
   }

   val service = HttpRoutes.of[IO] { 
     case GET -> Root / "hello" / name => Ok(s"Hello wrapper, $name.")
   }

   val wrappedService = myMiddle(service, Header("SomeKey", "SomeValue"))

   val apiService = HttpRoutes.of[IO] { 
     case GET -> Root / "rest1" => Ok("OK response API")
   }

   val httpRoute = Router("/" -> wrappedService, "/api" -> apiService).orNotFound

   def run(args: List[String]): IO[ExitCode] =
     BlazeServerBuilder[IO](global)
      .bindHttp(7676, "localhost")
      .withHttpApp(httpRoute)
      .serve
      .compile
      .drain
      .as(ExitCode.Success)
}

Test

Para definir las pruebas unitarias del middleware, igual que los servivios, se emplea el framework Munit; en nuestro caso, en el resultado se obtendrá las cabeceras asignadas por el middleware.

El snippet de las pruebas unitarias es el siguiente:

class MiddlewareEjem1Test extends munit.FunSuite {
   […]
   test("Test wrappedService MiddlewareEjem1 OK") {
     val service = MiddlewareEjem1.wrappedService
     val requestTest = Request[IO](Method.GET, uri"/hello/test")
     val result = service.orNotFound(requestTest).unsafeRunSync()
     assertEquals(result.status, Status.Ok)
     assertEquals(result.headers.get(CaseInsensitiveString("SomeKey")).get.value, "SomeValue")
   }
   […]
}

Definición de un middleware con un object.

Otra forma de definir el middleware es utilizando un objeto. El objeto implementará el método apply el cual tendrá la misma funcionalidad que la función del ejemplo anterior; en nuestro caso, se utiliza una función addHeader para modularizar más el código. El servicio que emplee este objeto, wrappedService, simplemente realiza la creación del objeto; posteriormente, se emplea en la creación del Router.

El snipper del código ejemplo es el siguiente:

object MiddlewareEjem2 extends IOApp {
   object MyMiddle {
     def addHeader(resp: Response[IO], header: Header): Response[IO] = resp match {
        case Status.Successful(resp) => resp.putHeaders(header)
        case resp => resp
     }

     def apply(service: HttpRoutes[IO], header: Header) =
        service.map(addHeader(_, header))
   }

   val service = HttpRoutes.of[IO] { 
      case GET -> Root / "middleware" / name =>
        Ok(s"Hello wrapper, $name.")
   }

   val wrappedService = MyMiddle(service, Header("SomeKey", "SomeValue"))

   val apiService = HttpRoutes.of[IO] { 
      case GET -> Root / "rest1" =>
        Ok("OK response API")
   }

   val httpRoute = Router("/" -> wrappedService, "/api" -> apiService).orNotFound
      def run(args: List[String]): IO[ExitCode] =
         BlazeServerBuilder[IO](global)
           .bindHttp(7676, "localhost")
           .withHttpApp(httpRoute)
           .serve
           .compile
           .drain
           .as(ExitCode.Success)
}

Test

La definicón del test del ejemplo anterior sigue la misma estructura que los ejemplos anteriores. El Snippet con el código de test es el siguiente:

[…]
test("Test wrappedService MiddlewareEjem1 OK") {
   val service = MiddlewareEjem2.wrappedService
   val requestTest = Request[IO](Method.GET, uri"/middleware/test")
   val result = service.orNotFound(requestTest).unsafeRunSync()
   assertEquals(result.status, Status.Ok)
   assertEquals(result.headers.get(CaseInsensitiveString("SomeKey")).get.value, "SomeValue")
}
[…]

Composición de un middleware y un servicio.

La composición de middleware sigue el mismo criterio que los servicios porque son funciones kleisli. El ejemplo de una composición de middleware es el siguiente:

object MiddlewareEjem3 extends IOApp {

  object MyMiddle {
    def addHeader(resp: Response[IO], header: Header): Response[IO] = resp match {
      case Status.Successful(resp) => resp.putHeaders(header)
      case resp => resp
    }

    def apply(service: HttpRoutes[IO], header: Header) =
      service.map(addHeader(_, header))
  }

  val service = HttpRoutes.of[IO] { case GET -> Root / "middleware" / name =>
      Ok(s"Hello wrapper, $name.")
  }

  val apiService = HttpRoutes.of[IO] { case GET -> Root / "rest1" =>
      Ok("OK response API")
  }

  val wrappedService = apiService <+> MyMiddle(service, Header("SomeKey", "SomeValue"))

  val httpRoute = Router("/api" -> wrappedService).orNotFound

  def run(args: List[String]): IO[ExitCode] =
     BlazeServerBuilder[IO](global)
      .bindHttp(7676, "localhost")
      .withHttpApp(httpRoute)
      .serve
      .compile
      .drain
      .as(ExitCode.Success)
}

La definición de los test de este ejemplo es análogo a los anteriores.

Para el lector interesado en el código ejemplo puede acceder a través del siguiente enlace.

En la siguiente entrada, HTTP4S: manejo de JSON, me centraré en cómo se manejan componentes JSON con HTTP4S.

AWS Lambda en Scala. Operaciones con AWS S3

Las tres grandes soluciones utilizadas en el mundo empresarial para definir sistemas cloud son Amazon AWS, Microsoft Azure y Google Cloud. Las tres soluciones permiten la posibilidad de desarrollar arquitecturas serverless la cual se implementan con funciones lambda. En la presente entrada, AWS Lambda en Scala. Operaciones con AWS S3, describiré cómo definir una función lambda en Amazon AWS.

 

 

 

 

 

Definimos arquitectura Serverless como aquella arquitectura que define sistemas con aplicaciones y servicios, con capacidad de ejecución, así como, la posibilidad de crear nuevas aplicaciones y servicios, sin necesidad de administrar infraestructura.

Definimos una función Lambda de AWS como “un un servicio informático que permite ejecutar código sin aprovisionar ni administrar servidores. AWS Lambda ejecuta el código sólo cuando es necesario, y se escala de manera automática, pasando de pocas solicitudes al día a miles por segundo”.

Las funciones Lambda pueden ser definidas en diferentes lenguajes como pueden ser: Java, Python, Node, Scala,… en las diferentes plataformas. Dada la diversidad de plataformas y lenguajes, las soluciones son amplias y diversas. Para unificar funcionalidad ante las plataformas y lenguajes existen frameworks que ofrecen operaciones para simplificar la tarea al desarrollador. Un ejemplo de este tipo de tecnología es el framework Serverless.

El framework Serverless es una herramienta Open Source la cual permite el desarrollo y despliegue de aplicaciones serverless en AWS, Azure, Google y otras más.

Instalación en entorno Linux/Mac

La instalación del framework en un entorno Linux o Mac es muy sencilla, simplemente es necesario ejecutar el siguiente comando desde la línea de comando:

curl -o- -L https://slss.io/install | bash

Para la verificación de la instalación, se ejecuta el siguiente comando:

serverless -h

El resultado del comando anterior deberá de mostrar la información de los comandos del framework.

Descripción funcional de la función de ejemplo

Definiremos una función que opere sobre la solución cloud de Amazon. Desde un punto de vista funcional, la función es sencilla, realizará ciertas operaciones con el servicio S3 de AWS descritas en el siguiente listado:

  • Listado de los bucket existentes.
  • Creación de un bucket en S3.
  • Subida de un fichero a S3.
  • Descarga de un fichero a S3.

Creación de la función con Serverless

Para realizar la creación de una función, utilizaremos el comando create del framework Serverless; para ello, en la consola del sistema, crearemos una carpeta (por ejemplo: serverless-scala-aws-s3) y ejecutaremos el comando create de serverless. El snippet copn los comandos son los siguientes:

cd serverless-scala-aws-s3
serverless create --template aws-scala-sbt --path lambda-s3

El comando create emplea la plantilla para un proyecto en Scala con sbt y define el path a la función. Además de la plantilla del lenguaje Scala, se pueden definir funciones en otros lenguajes como Python, Java, kotlin, Go,…

La vista de la estructura creada desde un IDE es el siguiente:

Los componentes del proyecto son los siguientes:

  • build.sbt Fichero sbt para la gestión del ciclo de vida del código de la función. Al tener que operar con S3 se debe de definir la dependencia de la librería AWScala en la referencia libraryDependencies. Las librerías utilizadas en este proyecto son las siguientes:
libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-lambda-java-events" % "2.2.7",
  "com.amazonaws" % "aws-lambda-java-core" % "1.2.0",
  "com.amazonaws" % "aws-lambda-java-log4j2" % "1.1.0",
  "com.github.seratch" %% "awscala" % "0.8.+"
)
  • Componentes de Scala. La plantilla del framework crea automáticamente cuatro componentes, siendo el más importante el Handler de la función.
    • Handler.- El componente handler define dos clases: Handler, para la función lambda a desarrollar; y, ApiGatewayHandler, para definir la clase para el servicio API Gateway; en nuestro caso, nos centraremos en la clase Handler.La clase Handler define un método handleRequest en el cual desarrollaremos la funcionalidad del ejemplo.
import scala.jdk.CollectionConverters._
class Handler extends RequestHandler[Request, Response] {
  val logger: Logger = LogManager.getLogger(getClass)
  def handleRequest(input: Request, context: Context): Response = {
    implicit val region = Region.US_EAST_1
    implicit val s3 = S3()
    val buckets: Seq[Bucket] = s3.buckets
    logger.info(s"\n1 buckets: $buckets \n")
    val bucket: Bucket = s3.createBucket("prueba2fromlambdafunction")
    logger.info(s"\n2 bucket: $bucket \n")
    // Upload operation of the file example1-file.txt with name example1-uploaded-file.txt
    bucket.put("example1-uploaded-file.txt", new java.io.File("example1-file.txt"))
    val s3obj: Option[S3Object] = bucket.getObject("example1-uploaded-file.txt")
    logger.info(s"\n3 Uploaded: ${s3obj.getOrElse("Empty")} \n")
    logger.info(s"Received a request: $input")
    Response("Go Serverless v1.0!!!!! Your function executed successfully!!", input)
  }
}
    • Request.- Define la clase Request con los parámetros del evento de entrada.
import scala.beans.BeanProperty
class Request(@BeanProperty var key1: String, @BeanProperty var key2: String, @BeanProperty var key3: String) {
  def this() = this("", "", "")
  override def toString: String = s"Request($key1, $key2, $key3)"
}
    • Response.- Define la clase respuesta del tipo de retorno del Handler.
import scala.beans.BeanProperty
case class Response(@BeanProperty message: String, @BeanProperty request: Request)
    • ApiGatewayResponse.- Define la clase de respuesta para el caso de APIGateway.
case class ApiGatewayResponse(@BeanProperty statusCode: Integer, @BeanProperty body: String,
@BeanProperty headers: java.util.Map[String, String], @BeanProperty base64Encoded: Boolean = false)
  • Serverless.yml. El fichero serverless.yml es aquel lugar donde se configura la función para que sea desplegada en AWS. El fichero está compuesto por varias secciones en donde se define las variables, la función, o bien, aquellos recursos necesarios de AWS. Este fichero es el que empleará el framework Serverless para definir la plantilla de CloudFormation para su despliegue en AWS. La secciones son:
    • Service.- Definición del nombre del servicio de la función en AWS.
    • Provider.- definición de las variables internas a AWS.
    • Custom.- Definición de las variables específicas para la función como por ejemplo: nombre del proyecto, región,… proporcionada por los valores definidos en Provider, o bien, desde la línea de comando.
    • Environment.- Definición de las variables de entorno globales.
    • Package.- configuración del paquete a crear para realizar la subida a AWS. Se puede definir qué ficheros incluir o excluir, o bien, el nombre del jar con el que se trabaja.
    • Functions.- definición de la función Scala, definición de la referencia del rol, variables de entorno,…
    • Resources.- definición de los recursos empleados por la función en AWS; en nuestro caso, definición del role y las políticas de seguridad.

El contenido del fichero es el siguiente:

service: lambda-s3
provider:
  name: aws
  project: scalaproject
  runtime: java8
  stage: ${opt:stage, 'dev'}
  region: us-east-1
  timeout: 900
  iamRoleStatements:
    - Effect: Allow
      Action:
        - s3:GetObject
        - s3:PutObject
      Resource:
        - "arn:aws:s3:::prueba2fromlambdafunction/*"
custom:
  currentStage: ${opt:stage, self:provider.stage}
  currentProject: ${self:provider.project}
  currentRegion: ${opt:region, self:provider.region}
environment:

package:
  individually: true
  artifact: target/scala-2.13/lambda-s3.jar

functions:
  lambda-s3:
    handler: app.Handler
    role: LambdaRole
    environment:
      ENV: ${self:custom.currentStage}
resources:
  Resources:
    LambdaRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: ${self:custom.currentProject}-lambda-s3-${self:custom.currentStage}
          AssumeRolePolicyDocument:
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action: sts:AssumeRole
        Policies:
          - PolicyName: ${self:custom.currentProject}-lambda-s3-${self:custom.currentStage}
            PolicyDocument:
              Statement:
                - Effect: Allow
                  Action:
                    - logs:CreateLogGroup
                    - logs:CreateLogStream
                    - logs:PutLogEvents
                    - s3:*
                  # - ec2:DescribeNetworkInterfaces
                  # - ec2:CreateNetworkInterface
                  # - ec2:DeleteNetworkInterface
                  # - ec2:DescribeInstances
                  # - ec2:AttachNetworkInterface
                 Resource: "*"

De snippet anterior resaltar las líneas comentadas en la definición de los permisos; éstas líneas, corresponden a los permisos que se deben de añadir si se desea que la función Lambda se ejecute en una subred de una VPC determinada.

Ciclo de vida

Configuración del profile de AWS. Para trabajar con AWS es necesario instalar el cliente de AWS y definir las credenciales del usuario para poder realizar las operaciones de despliegue en la cuenta de Amazon.

  • Creación del artefacto. Para realizar el despliegue, es necesario construir el artefacto con los componentes Scala y su ensamblado con las librerías necesarias. El comando SBT a ejecutar en la carpeta de la función es el siguiente:
sbt assembly
  • Despliegue de la función en Amazon AWS. El proceso de despliegue consiste en crear o modificar los recursos en AWS o el código de la función utilizando el stack de cloudformation asociado al fichero serverless.yml. El comando a ejecutar en la carpeta de la función es el siguiente:
serverless deploy -r us-east-1
  • Eliminación de la función. Si se desea eliminar la función se puede eliminar la función y sus recursos asociados con el siguiente:
serverless remove

Librería AWScala

La librería AWScala es aquella librería que permite realizar las operaciones con S3 u otros servicios de AWS. En nuestro caso, nos centraremos en definir las operaciones en S3.

  • Instancia del cliente S3. La creación de un cliente para realizar operaciones con S3 se realiza creando un componente de tipo S3. Dado que la función tiene asociado un role con los permisos de acceso, no es necesario asignar las credenciales. El snippet de ejemplo es el siguiente:
import awscala._, s3._
import awscala.s3._
import awscala.Region
implicit val region = Region.US_EAST_1
implicit val s3 = S3()
  • Listado de los buckets existentes. Una vez creado el cliente, se realiza la conexión al servicio S3 y, con la función buckets, obtenemos una lista con los bucket existentes. El snippet de ejemplo es el siguiente:
val buckets: Seq[Bucket] = s3.buckets
  • Creación de un bucket. De la misma manera que el caso anterior, el cliente S3 tiene una función de creación de bucket cuyo nombre es createBucket al cual se le pasa un nombre único del bucket a crear. El snippet de ejemplo es el siguiente:
val bucket: Bucket = s3.createBucket("prueba2fromlambdafunction")
  • Subir un fichero a S3. Para subir un fichero a S3, el cliente S3 emplea la función put al cual, como primer parámetro, se le pasa el nombre que tendrá en el bucket; y, como segundo parámetro, se le pasa un objeto File con la referencia del fichero. En nuestro caso, existe un fichero de texto example1-file.txt en el proyecto. El snippet de ejemplo es el siguiente:
bucket.put("example1-uploaded-file.txt", new java.io.File("example1-file.txt"))
  • Descarga de un fichero de S3. Para descargar un fichero de S3, el cliente emplea la función getObject a la cual se le pasa como parámetro el nombre del elemento a descargar. El snippet de ejemplo es el siguiente:

val s3obj: Option[S3Object] = bucket.getObject(«example1-uploaded-file.txt»)

Ejecución

Una vez desplegada la función con el comando de serverless deploy, hay que entrar en la consola de AWS y navegaremos hasta la consola de funciones lambda;y,
una vez en la consola, tendremos la referencia a la función. Para las pruebas, crearemos un evento con unos datos de pruebas como los siguientes:

{
"key1": "value1",
"key2": "value2",
"key3": "value3"
}

Para ejecutar la función, pulsaremos el botón de Test en la parte superior derecha; tras la ejecución, se reportará el resultado de la función y se crearán en el cloudwatch las trazas de la función. El aspecto de la consola de AWS con la información de la función es el siguiente:

El resultado de la función en S3 es la creación de un fichero en el bucket prueba2fromlambdafunction. La vista de la consola S3 tras la ejecución de la función es la siguiente:

 

Si el lector está interesado en el código puede acceder al siguiente repositorio de GitHub.

Conclusiones

La selección del lenguaje con el que se opera con AWS depende del equipo de desarrollo ya que, en función del conocimiento de los posibles lenguajes, se seleccionará uno u otro. Desde mi experiencia en los equipos en lod que he trabajado, siempre se ha elegido el lenguaje Python por su sencillez de uso utilizando la librería boto3. Con el presente ejemplo, quiero poner de manifiesto la sencillez con el lenguaje Scala y, dado que estamos construyendo funciones lambda sin infraestructura, utilizar un lenguaje con paradigma funcional permite construir componentes software orientados a la funcionalidad a desarrollar.

Inyección de dependencias en programación funcional III. Mónada Reader

Llegamos a la la última entrada de la sería de inyección de dependencias con la entrada, Inyección de dependencias en programación funcional III. Mónada Reader. El objetivo de la misma es mostrar al lector cómo se realiza la inyección de dependencias con la mónada Reader en lenguaje Scala. Para el lector interesado, las entradas de la serie son las siguientes:

La diferencia conceptual respecto a las otras dos es el uso de la mónada Reader. La mónada Reader es aquella mónada la cual puede leer un determinado componente; en dicho  componente, es donde definimos los elementos con las referencias de las funciones a inyectar. Así, necesitamos definir un elemento, en nuestro caso una case class, con las referencias a las funciones las cuáles están definidas en los componentes. Por otro lado, el servicio de negocio lo definimos a partir de un trait con un constructor de tipos.

Desde un punto de vista gráfico, la vista estática de los componentes queda definida como sigue:

Los tipos utilizados son los siguientes:

import cats.data.Reader
import cats.syntax.either._
import scala.language.higherKinds
object typesEjem3{
  type MensajeError = String
  type GetComponent1 = (String) => Either[MensajeError, String]
  type GetComponent2 = (Int) => Either[MensajeError, Int]
  type ResponseService = Either[MensajeError, String]
  type ParameterString = String
  type ParameterInt = Int
  type ServiceOperation[A] = Reader[ServiceContext, A]
  case class ServiceContext( funcComponent1: GetComponent1, funcComponent2: GetComponent2 )
}

La definición de los componentes de negocio del ejemplo son los representados por el objeto Component1Ejem3 y Component2Ejem3. El snippet del código de los componentes es el siguiente:

object Component1Ejem3{
  import typesEjem3._
  val response1: MensajeError = "Error en Response1"
  val doSomething: GetComponent1 = (elem: String) => {
    elem.length match {
      case lengthElem: Int if lengthElem > 0 => (elem + " modificado").asRight
      case _ => response1.asLeft
    }
  }
}
object Component2Ejem3{
  import typesEjem3._
  val response2: MensajeError = "Error en Response2"
  val doSomething: GetComponent2 = (num: Int) => {
    num match {
      case elem: Int if elem > 0 => elem.asRight
      case _ => response2.asLeft
    }
  }
}

La definición del servicio de negocio se realiza con un type class empleando un trait Service3 y el objeto ServiceImpl. Para el lector interesado en conocer lo que es un Type Class en los siguientes enlaces describo cómo se define y describe dicho patrón. Los enlaces son los siguientes:

El snippet del código del servicio es el siguiente:

trait Service3[ F[_] ]{
  def doBusiness(msg: typesEjem3.ParameterString): F[ Either[typesEjem3.MensajeError, String] ]
}
object ServiceImpl extends Service3[typesEjem3.ServiceOperation]{
  override def doBusiness(msg: typesEjem3.ParameterString): typesEjem3.ServiceOperation[Either[typesEjem3.MensajeError, String]] = Reader{ ctx =>
    for{
      response1 <- ctx.funcComponent1(msg).right
      response2 <- ctx.funcComponent2(msg.length).right
    }yield{
      response1 + "-" + response2
    }
  }
}

Como se muestra en el snippet anterior la función doBusiness del objeto ServiceImpl define la funcionalidad del servicio y es donde se utiliza la mónada Reader. La Mónada Reader se define de la siguiente manera : Reader[ServiceContext, A]; siendo la entrada de tipo ServiceContext; y, como salida, el tipo A el cual en nuestro caso es de tipo Either. Analizando la función, el objeto de entrada es de tipo ServiceContext con las referencias a los componentes que se inyectan y, como resultado, se retorna un elemento de tipo Either.

La aplicación que usa los anteriores elementos es la siguiente:

object Ejem3DependencyInyector extends App{
  import typesEjem3._
  def ejemplo1(): Unit = {
    val context = ServiceContext(Component1Ejem3.doSomething, Component2Ejem3.doSomething)
    val message1 = "Mensaje de prueba"
    ServiceImpl.doBusiness(message1).run(context) match {
      case Right(msg) => println(s"Test1=${msg}")
      case Left(error) => println(error)
    }
    println
  }
  ejemplo1()
}

En la aplicación anterior, se muestra cómo usar un servicio con una mónada Reader: lo primero, es definir una clase ServiceContext con las funciones de los componentes; segunda, crear e invocar la clase con la mónada usando la función run; y, para finalizar, tratar el resultado con un pattern matching.

La salida por consola es la siguiente:

Test1=Mensaje de prueba modificado-17

La definición de los test del servicio de negocio descrito en el ejemplo es el siguiente:

import cats.syntax.all._
import es.ams.dependencyinyector.typesEjem3.{ GetComponent1, GetComponent2, ServiceContext} //
import org.scalatest.{Matchers, WordSpec}
class Ejem3DependecyInyectorTest extends WordSpec with Matchers {
  "Example Mock" should {
    "Example OK" in {
      val context = ServiceContext(Component1Ejem3.doSomething, Component2Ejem3.doSomething)
      val msg: String = "prueba"
      val result: String = ServiceImpl.doBusiness(msg).run(context) match {
        case Right(msg) => { println(msg); msg}
        case Left(error) => error
      }
      result shouldBe(msg + " modificado-6")
    }
    "Example OK: mock component1" in {
      val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
      val context = ServiceContext(funcGetResponse1Mock, Component2Ejem3.doSomething)
      val msg: String = "prueba"
      val result: String = ServiceImpl.doBusiness(msg).run(context) match {
        case Right(msg) => { println(msg); msg}
        case Left(error) => error
      }
      assert(result.length > 0)
      assert(result.equals("mock-6"))
    }
    "Example OK: mock component2" in {
      val funcComponent2: GetComponent2 = (num: Int) => 0.asRight
      val context = ServiceContext(Component1Ejem3.doSomething,funcComponent2)
      val msg: String = "prueba"
      val result: String = ServiceImpl.doBusiness(msg).run(context) match {
        case Right(msg) => { println(msg); msg}
        case Left(error) => error
      }
      assert(result.length > 0)
    }
    "Example OK: mock component1 and mock component2" in {
      val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
      val funcGetResponse2Mock: GetComponent2 = (num: Int) => 0.asRight
      val context = ServiceContext(funcGetResponse1Mock, funcGetResponse2Mock)
      val msg: String = "prueba"
      val result: String = ServiceImpl.doBusiness(msg).run(context) match {
        case Right(msg) => { println(msg); msg}
        case Left(error) => error
      }
      assert(result.length > 0)
      assert(result.equals("mock-0"))
    }
  }
}

La inyección de dependencias desde un punto de vista funcional sigue la misma filosofía que la inyección de dependencias de objetos. La primera consecuencia es la desaparición de la utilización de framework de Mock necesarios en otros paradigmas como el utilizado en los lenguajes Java o Python. La utilización del paradigma funcional permite la composición de elementos más intuitiva aunque, evidentemente, la curva de aprendizaje es mayor.

Inyección de dependencias en programación funcional II

En la entrada anterior, Inyección de dependencias en programación funcional I, realicé la descripción de cómo se realizaba la inyección de funciones en programación funcional en lenguaje Scala; en la presente entrada, Inyección de dependencias en programación funcional II, modularizaré el código existente en la primera entrada organizando el código con una perspectiva orientada a objetos sin perder el aspecto funcional.

La vista estática del problema es la definida en el diagrama de clases de la siguiente imagen:

 

Los tipos utilizados en el ejemplo son los siguientes:

import cats.syntax.either._
object typesEjem2{
  type MensajeError = String
  type GetComponent1 = (String) => Either[MensajeError, String]
  type GetComponent2 = (Int) => Either[MensajeError, Int]
  type ResponseService = Either[MensajeError, String]
  type ParameterString = String
  type ParameterInt = Int
  type BusinessService = (GetComponent1, GetComponent2) => ParameterString => ResponseService
}

La definición de los componentes de negocio del ejemplo son los representados por los objetos Component1 y Component2. Respecto al ejemplo de la entrada anterior, se han definido las funciones dentro de un objeto con lo cual modularizamos la funcionalidad. El snippet del código de los componentes es el siguiente:

object Component1{
  import typesEjem2._
  val response1: MensajeError = "Error en Response1"
  val doSomething: GetComponent1 = (elem: String) => {
    elem.length match {
      case lengthElem: Int if lengthElem > 0 => (elem + " modificado").asRight
      case _ => response1.asLeft
    }
  }
}
object Component2{
  import typesEjem2._
  val response2: MensajeError = "Error en Response2"
  val doSomething: GetComponent2 = (num: Int) => {
    num match {
      case elem: Int if elem > 0 => elem.asRight
      case _ => response2.asLeft
    }
  }
}

La definición del servicio de negocio del ejemplo es el definido por el objeto Service. La estrategia de modularización es la misma que con los componentes. El snippet del código del servicio es el siguiente:

object Service{
  import typesEjem2._
  val doBusinessActivity: BusinessService = (objComp1, objComp2) => (msg) => {
    for {
      respon1 <- objComp1 (msg)
      respon2 <- objComp2(msg.length)
    } yield {
      respon1 + "-" + respon2
    }
  }
}

La aplicación de ejemplo que usa los anteriores elementos es la siguiente:

object Ejem2DependencyInyectorApp extends App {
  def ejemplo1(): Unit = {
    val message1 = "Mensaje de prueba"
    Service.doBusinessActivity(Component1.doSomething, Component2.doSomething)(message1) match {
      case Right(msg) => println(s"Test1=${msg}")
      case Left(error) => println(error)
    }
    val message2 = ""
    Service.doBusinessActivity(Component1.doSomething, Component2.doSomething)(message2) match {
      case Right(msg) => println(s"Test2=${msg}")
      case Left(error) => println(error)
    }
  }
  ejemplo1()
}

La salida por consola es la siguiente:

Test1=Mensaje de prueba modificado-17
Error en Response1

La definición de los test del servicio de negocio descrito en el ejemplo es el siguiente:

import org.scalatest.{Matchers, WordSpec}
import es.ams.dependencyinyector.typesEjem2.{GetComponent1, GetComponent2}
import cats.syntax.all._
class Ejem2DependecyInyectorTest extends WordSpec with Matchers {
  "Example Mock" should {
    "Example OK" in {
      val msg: String = "prueba"
      val result: String = Service.doBusinessActivity(Component1.doSomething, Component2.doSomething)(msg) match {
        case Right(msg) => { println(msg); msg}
        case Left(error) => error
      }
      result shouldBe(msg + " modificado-6")
    }
  "Example OK: mock component1" in {
    val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
    val msg: String = "prueba"
    val result: String = Service.doBusinessActivity(funcGetResponse1Mock, Component2.doSomething)(msg) match {
      case Right(msg) => { println(msg); msg}
      case Left(error) => error
    }
    assert(result.length > 0)
    assert(result.equals("mock-6"))
  }
  "Example OK: mock component2" in {
    val funcComponent2: GetComponent2 = (num: Int) => 0.asRight
    val msg: String = "prueba"
    val result: String = Service.doBusinessActivity(Component1.doSomething, funcComponent2)(msg) match {
      case Right(msg) => { println(msg); msg}
      case Left(error) => error
    }
    assert(result.length > 0)
  }
  "Example OK: mock component1 and mock component2" in {
    val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
    val funcGetResponse2Mock: GetComponent2 = (num: Int) => 0.asRight
    val msg: String = "prueba"
    val result: String =Service.doBusinessActivity(funcGetResponse1Mock, funcGetResponse2Mock)(msg) match {
      case Right(msg) => { println(msg); msg}
      case Left(error) => error
    }
    assert(result.length > 0)
    assert(result.equals("mock-0"))
    }
  }
}

En esta entrada he realizado la modularización del código definido en la entrada, Inyección de dependencias en programación funcional I; en la siguiente entrada, subiré el nivel de abstracción y describiré el mismo problema utilizando la mónada Reader.

Inyección de dependencias en programación funcional I

La inyección de dependencias es un patrón que en otros paradigmas y lenguajes es un patrón muy utilizado; por ejemplo en Java, el framework Spring, se basa en el patrón de inyección de dependencias de objetos. En la programación funcional, la inyección de dependencias se realiza inyectando funciones, no objetos. En la presente entrada, Inyección de dependencias en programación funcional I, describiré la forma de inyectar funciones en lenguja Scala.

Supongamos que tenemos dos funciones que implementan la siguiente funcionalidad: la primera, dada un valor de tipo String de entrada, realiza la transformación de dicho parámetro concatenándole el valor «modificado»; la segunda función, dado un valor entero de entrada si es mayor a cero retorna dicho valor; en otro caso para las dos funciones, retorna un mensaje de error. El valor de retorno es un contenedor binario de tipo Either.

Por otro lado, definimos una función servicio que realiza una operación de negocio la cual utiliza las dos funciones anteriores descritas previamente. A esta función, para realizar su funcionalidad, necesitará que se le inyecten las funciones.

El objetivo del ejemplo es entender la inyección, no en definir una solución a un problema complejo.

Definición de tipos

Los tipos GetComponent1 y GetComponent2 definen las funciones básicas; el tipo Service, define la función de negocio; ResponseService, define el contenedor binario de respuesta del servicio; y, los tipos Parameter y MensajeError, otros tipos básicos necesarios.

En el siguiente snippet se define la definición en lenguaje Scala.

type MensajeError = String
type GetComponent1 = (String) => Either[MensajeError, String]
type GetComponent2 = (Int) => Either[MensajeError, Int]
type ResponseService = Either[MensajeError, String]
type Parameter = String
type Service = (GetComponent1, GetComponent2) => (Parameter) => (ResponseService)

Definición de componentes

Teniendo la definición de tipos, necesitamos la implementación de las funciones. La primera función, definida con el tipo GetComponent1, define una función cuyo parámetro de entrada es de tipo String, si el parámetro de entrada es un string cuya longitud es mayor a 0, retorna un elemento Right del tipo Either con la concatenación de la propia cadena de entrada y la palabra » modificado».

La segunda función, definida con el tipo GetComponent2, define una función cuyo parámetro de entrada es de tipo entero, si el valor de entrada es mayor a cero, retorna el mismo valor,en otro caso, retorno un elemento Right de tipo entero con el valor de entrada.

El código con la definición de las funciones es la siguiente:

val response1: MensajeError = "Error en Response1"
val funcGetResponse1: GetComponent1 = (elem: String) => {
  elem.length match {
    case lengthElem: Int if lengthElem > 0 => (elem + " modificado").asRight
    case _ => response1.asLeft
  }
}
val response2: MensajeError = "Error en Response2"
val funcGetResponse2: GetComponent2 = (num: Int) => {
  num match {
    case elem: Int if elem > 0 => elem.asRight
    case _ => response2.asLeft
  }
}

Definición del servicio

La definición de la función de servicio tiene un formado tipo Curry. Los primeros parámetros corresponde con las funciones de los tipo GetComponent1 y 2; el segundo grupo, corresponde con el parámetro que se empleará en las función; y, por último, se define la funcionalidad propiamente de negocio; en esta última parte, es donde se define la funcionalidad de negocio con las funciones inyectadas y los parámetros, así como, el resultado parcial, si fuera necesario, de las funciones.

El código con la definición de la función de servicio es la siguiente:

val funcService: Service = (getComponent1, getComponent2) => (msg) => {
  for {
    respon1 <- getComponent1(msg)
    respon2 <- getComponent2(msg.length)
  } yield {
    respon1 + "-" + respon2
  }
}

La función del servicio, funcService, recibe por parámetro aquellas funciones que le son necesarias, es decir, se le inyecta los elementos necesarios para realizar su operativa.

Dados las descripciones de las funciones de los apartados anteriores, una ejemplo básico de uso de la función servicio con la inyección de funciones es el siguiente:

object Ejem1DependecyInyectorApp extends App {
  import Ejem1DependecyInyector._
  def ejemplo1(): Unit = {
    val message1 = "Mensaje de prueba"
    funcService(funcGetResponse1, funcGetResponse2)(message1) match {
      case Right(msg) => println(s"Test1=${msg}")
      case Left(error) => println(error)
    }
    val message2 = ""
    funcService(funcGetResponse1, funcGetResponse2)(message2) match {
      case Right(msg) => println(s"Test2=${msg}")
      case Left(error) => println(error)
    }
  }
  ejemplo1()
}

La salida por consola es la siguiente:

Test1=Mensaje de prueba modificado-17
Error en Response1

Test

Una tarea fundamental en el desarrollo del software es la realización de pruebas unitarias de aquellos componentes realizados. En nuestro caso, las pruebas unitarias de la función servicio dependerán de los resultado de las funciones inyectadas;y, para sus pruebas, es necesario moquear aquellas funciones inyectadas. En programación función con el patrón que presento, las pruebas se simplifican porque no hay que utilizar un framework específico, simplemente, necesitamos definir una función con un determinado valor la cual se inyecta a la función servicio.

Los test unitarios de la función servicio presentada son los siguientes:

class Ejem1DependecyInyectorTest extends WordSpec with Matchers {
  "Example Mock" should {
    "Example OK" in {
      val msg: String = "prueba"
      val result: String = funcService(funcGetResponse1, funcGetResponse2)(msg) match {
         case Right(msg) => { println(msg); msg}
         case Left(error) => error
      }
      result shouldBe(msg + " modificado-6")
    }
    "Example OK: mock component1" in {
       val funcGetResponse2Mock: GetComponent2 = (num: Int) => 0.asRight
       val msg: String = "prueba"
       val result: String = funcService(funcGetResponse1, funcGetResponse2Mock )(msg) match {
          case Right(msg) => { println(msg); msg}
          case Left(error) => error
       }
       assert(result.length > 0)
    }
    "Example OK: mock component2" in {
       val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
       val msg: String = "prueba"
       val result: String = funcService(funcGetResponse1Mock, funcGetResponse2 )(msg) match {
          case Right(msg) => { println(msg); msg}
          case Left(error) => error
       }
       assert(result.length > 0)
       assert(result.equals("mock-6"))
    }
    "Example OK: mock component1 and mock component2" in {
       val funcGetResponse1Mock: GetComponent1 = (num: String) => "mock".asRight
       val funcGetResponse2Mock: GetComponent2 = (num: Int) => 0.asRight
       val msg: String = "prueba"
       val result: String = funcService(funcGetResponse1Mock, funcGetResponse2Mock )(msg) match {
          case Right(msg) => { println(msg); msg}
          case Left(error) => error
       }
       assert(result.length > 0)
       assert(result.equals("mock-0"))
    }
  }
}

En las próximas entregas, continuaré profundizando en la inyección de dependencias.

Patrón Traverse en cats

En la entrada anterior, Patrón Fodable en Cats, realicé una descripción de cómo se realizaban morfismos con tipos de datos algebraicos (ADT) utilizando la implementación del tipo Foldable de la librería cats. En la presente entrada, Patrón Traverse en Cats, me centraré en el tipo Traverse.

El tipo Traverse tiene dos funciones: traverse y sequence; en los siguientes apartados, realizaré la descripción de cada una.

1.- Traverse

El tipo Traverse define la función traverse la cual permite realizar lo siguiente: dado un tipo de entrada y dada una función de transformación; la función traverse permite: la iteración sobre el tipo de entrada, aplica la función a cada elemento de la entrada, acumular el resultado y retornar su resultado. Un ejemplo de una definición de función traverse puede ser el siguiente:

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import cats.syntax.applicative._
import cats.syntax.apply._
def getFutureTest(msg: String): Future[Int] = 
   Future{msg.length * 10 }
def myTraverse[A,B](list: List[A])(f: A => Future[B]): Future[List[B]] =
  list.foldLeft(Future(List.empty[B])){ (acc, elem) => {
      val resultElem = f(elem)
      for{
         acc <- acc
         elem <- resultElem
      }yield{ acc :+ elem }
    }
  }
val listExample1 = List ("a", "aa", "aaa")
val resultExample1 = myTraverse(listExample1)(getFutureTest)
println(s"myTraverse(List ('a', 'aa', 'aaa'))-->${Await.result( resultExample1, 5.seconds )}")

La salida por consola es la siguiente:

myTraverse(List ('a', 'aa', 'aaa'))-->List(10, 20, 30)

El snippet anterior define lo siguiente: getFutureTest, función que retorna un Future de enteros que retorna la longitud del string pasado por parámetro multiplicado por 10; myTraverse, función traverse implementado con foldLeft la cual opera con una lista y una función f que retorna un Futuro del tipo B a partir del tipo A; listExample1, una lista de pruebas; y, por último, el mensaje con la función traverse y su visualización por pantalla.

1.1.- Traverse con Applicative

La función traverse podemos simplificarla utilizando tipos que cumplan el patrón Applicative la cual contiene operaciones del patrón Semigroupal como la función mapN; así, la función traverse, se puede redefinir de la siguiente manera:

def myTraverse2[F[_]: Applicative, A,B](list: List[A])(f: A => F[B]): F[List[B]] =
  list.foldLeft( List.empty[B].pure[F] ){
      (acc, elem) => (acc, f(elem)).mapN(_ :+ _)
  }
import cats.instances.option._
def process(list: List[Int]) = {
   myTraverse2(list)(n => if(n%2==0) Some(n) else None)
}
println(s"--Ejemplo3--")
println(s"process(List(2,4,6))==>>${process(List(2,4,6))}")
println(s"process(List(1,2,3))==>>${process(List(1,2,3))}")

La salida por consola es la siguiente:

process(List(2,4,6))==>>Some(List(2, 4, 6))
process(List(1,2,3))==>>None

1.2.- Traverse con Validated

El siguiente ejemplo, permite la validación de los elementos de una lista en función de un criterio: los elementos pares son válidos y, los impares, son inválidos. El snippet con la solución es la siguiente:

import cats.data.Validated
import cats.instances.list._
type ErrorOn[A] = Validated[ List[String] ,A]
def myTraverse2[F[_]: Applicative, A,B](list: List[A])(f: A => F[B]): F[List[B]] =
   list.foldLeft( List.empty[B].pure[F] ){
      (acc, elem) => (acc, f(elem)).mapN(_ :+ _)
    }
def process(list: List[Int]): ErrorOn[List[Int]] = {
   myTraverse2(list){ n =>
     if(n%2==0){
        Validated.valid(n)
     }else{
        Validated.invalid(List(s"$n no está incluido."))
     }
   }
}
println(s"process(List(2,4,6))==>>${process(List(2,4,6))}")
println(s"process(List(1,2,3))==>>${process(List(1,2,3))}")
println(s"process(List(2,4,5,6))==>>${process(List(2,4,5,6))}")

La salida por consola es la siguiente:

process(List(2,4,6))==>>Valid(List(2, 4, 6))
process(List(1,2,3))==>>Invalid(List(1 no está incluido., 3 no está incluido.))
process(List(2,4,5,6))==>>Invalid(List(5 no está incluido.))

1.3.- Función traverse con el tipo traverse

En los apartados anteriores, me he centrado en mostrar ejemplos de la función traverse con una implementación propia. En el siguiente ejemplo, muestro un ejemplo con la función traverse del tipo Traverse. La funcionalidad del ejemplo consiste en procesar una lista de futuros, el snippet es el siguiente:

import cats.Traverse
import cats.instances.all._
val listExample1 = List ("a", "aa", "aaa")
def getFutureTest(msg: String): Future[Int] = 
  Future{msg.length * 10}
val result1:Future[List[Int]] = Traverse[List].traverse(listExample1)(getFutureTest)
println(s"Traverse1=${Await.result( result1, 2.seconds )}")
val listExampleSequence1 = List( Future(1), Future(2), Future(3))
val result2: Future[List[Int]] = Traverse[List].sequence(listExampleSequence1)
println(s"Sequence1=${Await.result( result2, 2.seconds )}")

La salida por consola es la siguiente:

Traverse1=List(10, 20, 30)
Sequence1=List(1, 2, 3)

2.- Sequence

Por otro lado, el tipo Traverse define la función sequence la cual permite recorrer los elementos de un tipo de entrada y realizar los cambios de tipos. Un ejemplo de una función sequence puede ser la siguiente:

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import cats.syntax.applicative._
import cats.syntax.apply._
def getFutureTest(msg: String): Future[Int] = 
    Future{msg.length * 10 }
def mySequence[B](list:List[Future[B]]): Future[List[B]] =
    myTraverse(list)(identity)
val listExampleSequence1 = List( getFutureTest("a"), getFutureTest("aa"), getFutureTest("aaa"))
val resultExample2 = mySequence(listExampleSequence1)
println(s"myTraverse(List (Future('a'), Future('aa'), Future('aaa'))-->${Await.result( resultExample2, 5.seconds )}")

La salida por consola es la siguiente:

myTraverse(List (Future('a'), Future('aa'), Future('aaa'))-->List(10, 20, 30)

El snippet anterir define lo siguiente: getFutureTest, función que retorna un Future de enteros que retorna la longitud del string pasado por parámetro multiplicado por 10; mySequence,  función que emplea la función traverse para realizar la transformación; listExampleSequence1, lista con los datos de prueba; y, por último, el mensaje con la función sequence y su visualización.

La funcionalidad del ejemplo anterior implemantado con la función sequence de Traverse queda descrito en el siguiente enjemplo:

import cats.Traverse
import cats.instances.all._
val listExampleSequence1 = List( getFutureTest("a"), getFutureTest("aa"), getFutureTest("aaa"))
val result1:Future[List[Int]] = Traverse[List].sequence(listExampleSequence1)
println(s"myTraverse(List (Future('a'), Future('aa'), Future('aaa'))-->${Await.result( result1, 5.seconds )}")

La salida por consola es la siguiente:

myTraverse(List (Future('a'), Future('aa'), Future('aaa'))-->List(10, 20, 30)

3.- Definición formal de Traverse

La definición formal del trait con la funcionalidad Traverse es la siguiente:

package cats
trait Traverse[F[_]] {
  def traverse[G[_]: Applicative, A, B] (inputs: F[A])(func: A => G[B]): G[F[B]]
  def sequence[G[_]: Applicative, B] (inputs: F[G[B]]): G[F[B]] = 
traverse(inputs)(identity)
}

Para finalizar la entrada y como conclusión final, el tipo Traverse es un patrón conseguido y comprensible a partir del patrón Foldable y la función fold. Traverse permite realizar la iteración y operación sobre colecciones de tipos y, además, realizar acumuladores de resultados de dichas colecciones.

Patrón Fodable en Cats

En la programación funcional uno de los conceptos base son los tipos de datos algebráicos (ADT) Los ADT son estructuras de datos basadas en las matemáticas cuyas operaciones se realizan mediante morfismos; y, los mosfirmos, se realizan mediante la función fold y sus derivados: foldRight y foldLeft. En la entrada de hoy, Patrón Foldable en Cats, realizaré la descripción de los morfismos utilizando la type class Foldable de la librería Cats.

1.- Definición de un ADT de tipo List

Un ADT es aquel tipo de dato con el que podemos realizar unas operaciones, como por ejemplo: la operación suma y producto; y, además, cumple unas propiedades  matemáticas como pueden ser la propiedad asociativa, distributiva, o bien, de identidad.

En el siguiente ejemplo, se muestra la definición del ADT de tipo MyList, el cual equivale al ADT de tipo List.

sealed trait MyList[+A]
case object Nil extends MyList[Nothing]
case class Cons[+A](elem: A, lista: MyList[A]) extends MyList[A]

La operación suma es aquella operación que, a nivel de programación, se corresponde con las relaciones de herencia entre la clase Cons y el objeto Nil con el trait MyList. La operación producto es aquella operación que, a nivel de programación, se corresponde con los parámetros de la clase Cons: elem y lista.

Una vez definido el ADT una de las formas de manipular dicha estructura es utilizando morfismos, función fold y sus derivados. La función fold equivale a la función foldRight. La definición de la función foldRight y foldLeft con el ADT MyList son los siguientes:

  • Morfismo foldRight para el ADT MyList.
def foldRight[A, B](lista: MyList[A], elem: B)(f: (A, B) => B): B = lista match {
  case Nil => elem
  case Cons(head, tail) => f(head, foldRight(tail, elem)(f))
}
  • Morfismo foldLeft para el ADT MyList.
@annotation.tailrec
def foldLeft[A, B](lista: MyList[A], elem: B)(f: (B, A) => B): B = lista match {
  case Nil => elem
  case Cons(head, tail) => foldLeft(tail, f(elem, head))(f)
}

fold, foldRight, foldLeft

En los siguientes apartados, realizaremos la descripción de ejemplos de uso de las operaciones fold con el type class que proporciona la librería Cats y con la librería estándar.

2.- Ejemplos de morfismos con el tipo List

En el presente apartado, realizaré la descripción de ejemplos con la función fold del ADT List de la librería estándar.

  • Ejemplos básicos de morfismo foldRight.- Definición de una construcción de una lista y suma de sus elementos con una lista de tipos de enteros y foldRight.
println(s"1.- foldRight=${ List(1,2,3).foldRight(List.empty[Int])( (e, acc) => e :: acc) }")
println(s"2.- foldRight=${ List(1,2,3).foldRight(0)( (e, acc) => e + acc ) }")
println(s"Suma con foldRight=${List(1, 2, 3, 4).foldRight(0)(_ + _)}")

La salida por consola es la siguiente:

1.- foldRight=List(1, 2, 3)
2.- foldRight=6
Suma con foldRight=10
  • Ejemplos básicos de morfismo foldLeft.- Definición de una construcción de una lista y suma de sus elementos con una lista de tipos de enteros y foldLeft.
println(s"1.- foldLeft=${ List(1,2,3).foldLeft(List.empty[Int])((acc, e) => e :: acc) }")
println(s"2.- foldLeft=${ List(1,2,3).foldLeft(0)( (acc, e) => acc + e ) }")

La salida por consola es la siguiente:

1.- foldLeft=List(3, 2, 1)
2.- foldLeft=6
  • FoldRight y el tipo Numeric.- Definición de una función suma empleando una lista de enteros y el tipo Numeric.
import scala.math.Numeric
def sumaConNumeric[A](list:List[A])(implicit numeric: Numeric[A]): A =
list.foldRight(numeric.zero)(numeric.plus)
println(s"Suma con Numeric=${sumaConNumeric(List(1, 2, 3, 4))}")
println

La salida por consola es la siguiente:

Suma con Numeric=10
  • FoldRight y monoides.- Definición de la operación suma sobre una lista de enteros empleando monoides.
import cats.Monoid
import cats.instances.int._ // for Monoid
def sumaConMonoid[A](list:List[A])(implicit monoid: Monoid[A]): A =
list.foldRight(monoid.empty)(monoid.combine)
println(s"Suma con Momoid=${sumaConMonoid(List(1, 2, 3, 4))}")

La salida por consola es la siguiente:

Suma con Momoid=10
  • FoldRight y definición de filtros.- Definición de unos filtros sobre una lista de enteros
val elemFilter1: Int = 3
println(s"List(1, 2, 3, 4) existe el 3?=${List(1, 2, 3, 4).foldRight(false)( (elem, resul) => resul || elem.equals(elemFilter1))}")
val elemFilter2: Int = 5
println(s"List(1, 2, 3, 4) existe el 5?=${List(1, 2, 3, 4).foldRight(false)( (elem, resul) => resul || elem.equals(elemFilter2))}")
def myfilter[A](list: List[A])(func: A => Boolean): List[A] =
list.foldRight(List.empty[A]) { (item, accum) => if(func(item)) item :: accum else accum }
println(s"List(1, 2, 3, 4) filtra los pares.=${ myfilter(List(1, 2, 3, 4))(_%2==0) }")

La salida por consola es la siguiente:

List(1, 2, 3, 4) existe el 3?=true
List(1, 2, 3, 4) existe el 5?=false
List(1, 2, 3, 4) filtra los pares.=List(2, 4)
  • FoldRight y definición de función map.- Definición de una función map con foldRight.
def myMap[A,B](list: List[A])(f: A => B): List[B] = list.foldRight(List.empty[B])( (elem, result) => f(elem) :: result )
println(s"List(1, 2, 3) map to String=${List(1, 2, 3).foldRight(List.empty[String])( (elem, resul) => s"-${elem.toString}-" :: resul)}")
println(s"List(1, 2, 3) map to String=${ myMap(List(1, 2, 3))( (elem:Int) => s"*${elem.toString}*" ) }")
println

La salida pos consola es la siguiente:

List(1, 2, 3) map to String=List(-1-, -2-, -3-)
List(1, 2, 3) map to String=List(*1*, *2*, *3*)
  • FoldRight y definición de función flatMap.- Definición de una función flatMap con foldRight.
def flatMap[A, B](list: List[A])(func: A => List[B]): List[B] =
list.foldRight(List.empty[B]) { (item, accum) => func(item) ::: accum }
println(s"-->>${flatMap(List(1, 2, 3))(a => List(a, a * 10, a * 100))}")
println

La salida por consola es la siguiente:

-->>List(1, 10, 100, 2, 20, 200, 3, 30, 300)

3.- Ejemplos con Foldable de cats.

Para poder operar con el tipo Foldable es necesario, al menos, realizar la importación de los siguientes tipos:

import cats.Foldable
import cats.instances.all._
  • Ejemplo de Foldable con función foldLeft con los tipos List, Vector, Stream y Option.
println(s"Suma List(1, 2, 3)=${Foldable[List].foldLeft(List(1, 2, 3), 0)( _ + _ )}")
println(s"Suma Vector(1, 2, 3)=${Foldable[Vector].foldLeft(Vector(1, 2, 3), 0)( _ + _ )}")
println(s"Suma Stream(1, 2, 3)=${Foldable[Stream].foldLeft(Stream(1, 2, 3), 0)( _ + _ )}")
println(s"Suma Option(10) + 5=${Foldable[Option].foldLeft(Option(10), 0)( (acc, elem) => elem + 5 )}")

La salida por consola es la siguiente:

Suma List(1, 2, 3)=6
Suma Vector(1, 2, 3)=6
Suma Stream(1, 2, 3)=6
Suma Option(10) + 5=15
  • StackOverflowError con función foldRight.

Supongamos que queramos realizar la suma de una estructura de tipo Stream de 100000 elementos, la definición de la solución sería la siguiente:

val lista = (1 to 100000).toStream
println(s"Suma (1 to 100000).toStream->${ lista.foldRight(0L)(_ + _) }")

El resultado de la ejecución del snippet anterior es errónea porque se produce un desbordamiento de la pila del sistema y nos aparece en consola un error de tipo StackOverflowError. Una solución a este problema utilizando la función foldRight es utilizando la mónada Eval. Si el lector está interesado en la mónada Eval, pude ir a las siguientes enlace.El snippet es el siguiente:

import cats.Eval
val resultEvalStream = Foldable[Stream].foldRight(lista, Eval.now(0L)) ((num, acc) => acc.map( _ + num))
println(s"Suma (1 to 100000).toStream->${ resultEvalStream.value }")

La salida por consola es la siguiente:

Suma (1 to 100000).toStream->5000050000
  • Ejemplos de funciones básicos de Foldable con tipo Option.
println(s"Foldable[Option].nonEmpty(Option(42))=${Foldable[Option].nonEmpty(Option(42))}" )
println(s"Foldable[Option].isEmpty(Option(42))=${Foldable[Option].isEmpty(Option(42))}" )
println(s"Foldable[Option].size(Option(42))=${Foldable[Option].size(Option(42))}" )
println(s"Foldable[Option].get(Option(42))(0)=${Foldable[Option].get(Option(42))(0) }" )
println(s"Foldable[Option].find(Option(42))( elem => elem>30)=${Foldable[Option].find(Option(42))( elem => elem>30) }" )
println

La salida por consola es la siguiente:

Foldable[Option].nonEmpty(Option(42))=true
Foldable[Option].isEmpty(Option(42))=false
Foldable[Option].size(Option(42))=1
Foldable[Option].get(Option(42))(0)=Some(42)
Foldable[Option].find(Option(42))( elem => elem>30)=Some(42)
  • Ejemplos de funciones básicas de Foldable con tipo List.
println(s"Foldable[Option].nonEmpty(List(1, 2, 3)=${Foldable[List].nonEmpty(List(1, 2, 3))}" )
println(s"Foldable[Option].isEmpty(List(1, 2, 3))=${Foldable[List].isEmpty(List(1, 2, 3))}" )
println(s"Foldable[Option].size(List(1, 2, 3)=${Foldable[List].size(List(1, 2, 3))}" )
println(s"Foldable[Option].get(List(1, 2, 3)(0)=${Foldable[List].get(List(1, 2, 3))(0)}" )
println(s"Foldable[Option].get(List(1, 2, 3)(1)=${Foldable[List].get(List(1, 2, 3))(1)}" )
println(s"Foldable[Option].get(List(1, 2, 3)(4)=${Foldable[List].get(List(1, 2, 3))(4)}" )
println(s"Foldable[Option].find(List(1, 2, 3)(4)=${Foldable[List].find(List(1, 2, 3))( elem => (elem%2==0) )}" )
println(s"Foldable[Option].find(List(1, 2, 3)(4)=${Foldable[List].find(List(1, 2, 3))( elem => (elem%2!=0) )}" )
println

La salida por consola es la siguiente:

Foldable[Option].nonEmpty(List(1, 2, 3)=true
Foldable[Option].isEmpty(List(1, 2, 3))=false
Foldable[Option].size(List(1, 2, 3)=3
Foldable[Option].get(List(1, 2, 3)(0)=Some(1)
Foldable[Option].get(List(1, 2, 3)(1)=Some(2)
Foldable[Option].get(List(1, 2, 3)(4)=None
Foldable[Option].find(List(1, 2, 3)(4)=Some(2)
Foldable[Option].find(List(1, 2, 3)(4)=Some(1)
  • Ejemplo de Foldable con monoides. El tipo Foldable define operaciones con monoides.
import cats.instances.all._
println(s"Foldable[Option].combineAll(List(1, 2, 3))=${Foldable[List].combineAll(List(1, 2, 3))}" )
println

La salida por consola es la siguiente:

Foldable[Option].combineAll(List(1, 2, 3))=6
  • Ejemplo de Foldable con función map. El tipo Foldable define la función foldMap para definir funciones con la funcionalidad de fold y la función map.
import cats.instances.all._
println(s"Foldable[List].foldMap(List(1, 2, 3))( elem => elem + 20) =${Foldable[List].foldMap(List(1, 2, 3))( elem => elem + 20) }")
println

La salida por consola es la siguiente:

Foldable[List].foldMap(List(1, 2, 3))( elem => elem + 20) =66

El entendimiento y el uso de los  morfismos facilita y simplifica el código; y, la utilización de Foldable, permite una versatilidad para cualquier operación.

Scala: Future con Ejemplos

En todo proyecto o aplicación informática es habitual realizar alguna operación asíncrona, es decir, ejecutar una operación en donde se lanza un mensaje de una operación sin quedarte bloqueado a la espera de su resultado. En la entrada de hoy, «Scala Future con ejemplos», voy a presentar unos ejemplos de utilización de Future desde un punto de vista practico.

Sin ser exhaustivo, podemos definir Future como aquel objeto que contiene un valor el cual estará disponible en algún instante.

La estructura de los ejemplos es incremental en dificultad y los ejemplos que presento son ejemplos que en nuevas versiones del lenguaje pueden presentar diferencias. Los ejemplos son los siguientes:

  1. Ejemplo 1 básico desde consola
  2. Ejemplo 2 básico desde consola.
  3. Ejemplo 3 básico desde consola.
  4. Ejemplo 4: Future y tratamiento de errores con recover.
  5. Ejemplo 5: Future y tratamiento de errores con recoverWith.
  6. Ejemplo 6: Future y ejecución paralela con función fallbackTo.
  7. Ejemplo 7: Future y ejecución paralela con función zip.
  8. Ejemplo 8: Future y ejecución paralela con for comprehension.
  9. Ejemplo 9: Tratamiento de tareas Future para aquella que acabe primero.

Ejemplo 1 básico desde consola

El ejemplo más básico es ejecutar un código en la consola Scala; para ello, arrancamos la consola; insertamos el comando «:paste» y, posteriormente, copiamos el siguiente snippet de código finalizando con Ctrl- D.

El ejemplo define un Future en el cual se lanza una excepción; una vez recibida el resultado, se escribe por la salida estándar.

import scala.concurrent._
import ExecutionContext.Implicits.global
val futureFail = Future { throw new Exception("Error!") }
futureFail.foreach( value => println("->" + value) )

La salida de la ejecución es la siguiente:

import scala.concurrent._
import ExecutionContext.Implicits.global
futureFail: scala.concurrent.Future[Nothing] = Future(Failure(java.lang.Exception: Error!))

Ejemplo 2 básico desde consola

Continuamos con la consola y, en este segundo ejemplo, el snippet del código se centra
en la gestión del resultado del Future con la función onComplete y los objetos Success
y Failure. El código es el siguiente:

import scala.util._
import scala.concurrent._
import ExecutionContext.Implicits.global
val futureFail = Future {
  throw new Exception("Error!")
}
futureFail.onComplete {
  case Success(value) => println("Success:" + value)
  case Failure(e) => println("Respuesta Failure:" + e)
}

La salida de la ejecución es la siguiente:

import scala.util._
import scala.concurrent._
import ExecutionContext.Implicits.global
futureFail: scala.concurrent.Future[Nothing] = Future(<not completed>)
Respuesta Failure:java.lang.Exception: Error!

Ejemplo 3 básico desde consola

La funcionalidad de un Future puede ser una función completa y, en su definición funcional, podemos utilizar funciones, o bien, definir Future en funciones.

En el presente snippet, se definen dos funciones que ejecutan Future: getEvent y getTraffic; además, se define una secuencia de ejecución de Future empleando las funciones anteriores: futureStep1 y futureStep2; el resultado de la ejecución de la secuencia, lo realiza futureStep2 el cual controla el resultado empleando objetos Success y Failure.

import scala.util._
import scala.concurrent._
import ExecutionContext.Implicits.global
def getEvent(parametro: String): Future[String] = {
  val resultadoGetEvent = Future{
    val resultado = "getEvent: " + parametro
    resultado
  }
  resultadoGetEvent
}
def getTraffic(parametro: String): Future[String] = {
  val resultadoGetTraffic = Future {
    val resultado = "getTraffic: '" + parametro + "'"
    resultado
  }
  resultadoGetTraffic
}
val futureStep1: Future[String] = getEvent("PruebaEvent")
val futureStep2: Future[String] = {
  futureStep1.flatMap { response =>
    getTraffic(response)
  }
}
futureStep2.onComplete {
  case Success(value) => println("futureStep2 Success:" + value)
  case Failure(e) => println("futureStep2 Failure:" + e)
}

La salida de la ejecución es la siguiente:

import scala.util._
import scala.concurrent._
import ExecutionContext.Implicits.global
getEvent: (parametro: String)scala.concurrent.Future[String]
getTraffic: (parametro: String)scala.concurrent.Future[String]
futureStep1: scala.concurrent.Future[String] = Future(Success(getEvent: PruebaEvent))
futureStep2: scala.concurrent.Future[String] = Future(<not completed>)

Ejemplo 4: Future y tratamiento de errores con recover

Supongamos que en las funciones getEvent y getTraffic se producen errores; dichos errores, tenemos que controlarlos y, en el caso que se produzcan, tenemos que retornar un valor determinado; para estos casos, empleamos la función recover.

import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global
implicit val timeout = Timeout(2 seconds)
case class Resultado(evento: String, traffic: String)
def ejemplo1(): Unit = {
  def getEvent(parametro: String): Future[String] = {
    val resultadoGetEvent = Future {
    val resultado = "getEvent: " + parametro
    println(resultado)
    resultado
  }.recover {
    case e: Exception => "Valor getEvent por defecto"
  }
  resultadoGetEvent
}
def getTraffic(parametro: String): Future[String] = {
  val resultadoGetTraffic = Future {
    val resultado = "getTraffic: '" + parametro + "'"
    println(resultado)
    resultado
  }.recover {
    case e: Exception => "Valor getTraffic por defecto"
  }
  resultadoGetTraffic
}
val resultadoFutures = for {
  event <- getEvent("Parametro")
  traffic <- getTraffic(event)
} yield {
  Resultado(event, traffic)
}
val result = Await.result(resultadoFutures, timeout.duration)
println(s"->${result}")
}

La salida de la ejecución es la siguiente:

getEvent: Parametro
getTraffic: 'getEvent: Parametro'
->Resultado(getEvent: Parametro,getTraffic: 'getEvent: Parametro')

Ejemplo 5: Future y tratamiento de errores con recoverWith

El ejemplo anterior controla los errores pero, ¿qué hacemos cuando una excepción puede ser un resultado esperado?, o bien, ¿qué hacemos cuando se pueden producir muchos tipos de excepciones y queremos controlar el resultado para cada una de ellas?. En estos casos utilizamos la función recoverWith.

case class Resultado(evento: String, traffic: String)
def ejemplo3(): Unit = {
  def getEvent(parametro: String): Future[String] = {
    val resultadoGetEvent = Future {
    val resultado = "getEvent: " + parametro
    println(resultado)
    resultado
    throw new IllegalArgumentException(s"Error en parametro ${parametro}!")
  }.recoverWith {
    case ex: IllegalArgumentException => Future.successful(ex.getMessage)
    case e: Exception => Future.failed[String](new Exception("Error generico en getEvent"))
  }
  resultadoGetEvent
}
def getTraffic(parametro: String): Future[String] = {
  val resultadoGetTraffic = Future {
    val resultado = "getTraffic: '" + parametro + "'"
    println(resultado)
    resultado
  }.recoverWith {
    case ex: IllegalArgumentException => Future.successful(ex.getMessage)
    case e: Exception => Future.failed[String](new Exception("Error generico en getEvent"))
  }
  resultadoGetTraffic
}
val resultadoFutures = for {
  event <- getEvent("Parametro")
  traffic <- getTraffic(event)
  } yield {
    Resultado(event, traffic)
  }
  val result = Await.result(resultadoFutures, timeout.duration)
  println(s"->${result}")
}

El tratamiento del resultado de la función, se realiza empleando un for comprehension de forma secuencial y la función result de Await espera por la terminación de las dos funciones.

Otra posible opción para el control del resultado es utilizando algo como sigue:

resultadoFutures.onComplete {
  case Success(value) => println("Success: #" + value + "#")
  case Failure(e) => println("Failure:" + e)
}

La salida de la ejecución es la siguiente:

getEvent: Parametro
getTraffic: 'Error en parametro Parametro!'
->Resultado(Error en parametro Parametro!,getTraffic: 'Error en parametro Parametro!')

Ejemplo 6: Future y ejecución paralela con función fallbackTo

En ciertos momentos necesitamos que dos Future se ejecuten de forma paralela. En estos casos, utilizamos la función fallbackTo.

import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global
implicit val timeout = Timeout(2 seconds)
def ejemplo2(): Unit = {
  def getEventforma2(parametro: String): Future[String] = {
    val resultadoGetEvent = Future {
    val resultado = "getEvent: " + parametro
    Thread.sleep(2000)
    println("->" + resultado)
    resultado
  }
  resultadoGetEvent
}
def getTrafficforma2(parametro: String): Future[String] = {
  val resultadoGetTraffic = Future {
    val resultado = "getTraffic: '" + parametro + "'"
    println("=>" + resultado)
    resultado
  }
  resultadoGetTraffic
}
// Se ejecuta en paralelo el future getEventforma2 y getTrafficforma2
// El resultado será el resultado del primer future que termine.
// El Await espera a que terminen los dos Future.
val futureResultado = getEventforma2("PruebaEvent") fallbackTo getTrafficforma2("PruebaTraffic")
val resultado = Await.result(futureResultado, timeout.duration)
println(s"->$resultado")
}

Un posible solución puede ser la siguiente pero, en función del tiempo de ejecución, se puede producir una excepción de tipo TimeoutException.

=>getTraffic: 'PruebaTraffic'
->getEvent: PruebaEvent
->getEvent: PruebaEvent

Ejemplo 7: Future y ejecución paralela con función zip

Otra forma de ejecutar Future en paralelo es utilizando la función zip y, con esta función, al terminar cada una de las funciones, realizar el tratamiento. El siguiente  ejemplo muestra un ejemplo de uso.

def ejemplo1(): Unit = {
case class Resultado(aEvent:String, aTraffic:String)
def getEvent(parametro: String): Future[String] = {
  val resultadoGetEvent = Future {
    val resultado = "getEvent: " + parametro
    println(s"getEvent=${resultado}")
    Thread.sleep(3000)
    resultado
  }
  resultadoGetEvent
}
def getTraffic(parametro: String): Future[String] = {
  val resultadoGetTraffic = Future {
    val resultado = "getTraffic: '" + parametro + "'"
    println(s"getTraffic=${resultado}")
    resultado
  }
  resultadoGetTraffic
}
val resultado = (getEvent("param1") zip getTraffic("param2")) map {
  case (event, traffic) => {
    println("#event=" + event + " #traffic=" + traffic)
    Resultado(aEvent=event, aTraffic=traffic)
  }
}
val result = Await.result(resultado, timeout.duration)
println("resultado forma1=" + result)
}

La salida de la ejecución del código es la siguiente:

getTraffic=getTraffic: 'param2'
getEvent=getEvent: param1
#event=getEvent: param1 #traffic=getTraffic: 'param2'
resultado forma1=Resultado(getEvent: param1,getTraffic: 'param2')

Ejemplo 8: Future y ejecución paralela con for comprehension

El objeto Future es de  tipo monádico con lo cual podemos emplear for comprehension de la siguiente forma:

def ejemplo2(): Unit = {
case class ResultadoMonada(tarea1: String, tarea2: String)
def getTareaAsincrona1(): String = {
  val resultadoTarea = "Hacemos una tarea asíncrona1"
  Thread.sleep(2000)
  resultadoTarea
}
def getTareaAsincrona2(): String = {
  val resultadoTarea = "Hacemos una tarea asíncrona2"
  resultadoTarea
}
val getTareaAsincrona1Future = Future {
  getTareaAsincrona1()
}
val getTareaAsincrona2Future = Future {
  getTareaAsincrona2()
}
val resultMonada = for {
  resultado1 <- getTareaAsincrona1Future
  resultado2 <- getTareaAsincrona2Future
} yield {
  ResultadoMonada(tarea1 = resultado1, tarea2 = resultado2)
}
val result = Await.result(resultMonada, timeout.duration)
println("resultado Monada=" + result)
}

La salida de la ejecución del código es la siguiente:

resultado Monada=ResultadoMonada(Hacemos una tarea asíncrona1,Hacemos una tarea asíncrona2)

Ejemplo 9: Tratamiento de tareas Future para aquella que acabe primero

Hay necesidades funcionales en las cuáles necesitamos lanzar varias tareas y tratar aquel Future cuya ejecución termine el primero, despreciando al resto. En estos casos, empleamos la función firstCompletedOf. En el siguiente ejemplo, tomando las funciones del apartado anterior, el tratamiento del primer Future en terminar sería el siguiente:

// Arranca la tareaProgramada después de 200 milisegundos
val tareaProgramada1 = after(200 millis, using=system.scheduler)(getTareaAsincrona1Future)
val result1 = Future firstCompletedOf(Seq(tareaProgramada1, getTareaAsincrona2Future))
println(s"Resultado Prueba1:${result1}")

Una de las salidas de la ejecución del código anterior es el siguiente:

Success(Hacemos una tarea asíncrona2)

Otra posible codificación puede ser la siguiente:

[...]
// Tratamiento "quien acabe primero": resultado Exception porque future2 tarda mas en terminar.
val tareaProgramada2 = after(200 millis, using=system.scheduler)(Future.failed(new IllegalStateException("error!")))
val future2 = Future { Thread.sleep(1000); "foo" }
val result2 = Future firstCompletedOf(Seq(tareaProgramada2, future2))
result2 onComplete{
  case Success(resultado) => println(s"resultado2=${resultado}")
  case Failure(error) => println(s"error2=${error}")
}

Al lanzar la tareaProgramada2 una excepción, la salida de la ejecución del código anterior es la siguiente:

error2=java.lang.IllegalStateException: error!

Para finalizar el tipo de ejemplo, otra ejecución puede ser la siguiente:

val tareaProgramada3 = after(200 millis, using=system.scheduler)(Future.failed(new IllegalStateException("error!")))
val future3 = Future { "foo" }
val result3 = Future firstCompletedOf(Seq(tareaProgramada3, future3))
result3 onComplete{
  case Success(resultado) => println(s"resultado3=${resultado}")
  case Failure(error) => println(s"error3=${error}")
}

La salida del anterior snippet de código es la siguiente:

resultado3=foo

Estos son los ejemplos que presento, si al lector interesado se le ocurre plantear otro ejemplo, o bien, plantear cualquier otra alternativa, estaré encantado de compartirlo.

«Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala

En la presente entrada, «Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala», describo cómo definir un proceso de streaming con Apache Spark con una fuente de datos Apache Kafka definido en lenguaje Scala.

La estructura del artículo está compuesta por los siguientes apartados:

  1.  Apache Kafka. En este apartado realizaré una breve presentación de Kafka, instalación y arranque de los elementos necesarios para el ejemplo.
  2.  Apache Spark. En este apartado realizaré una breve descripción de Spark streaming y la descripción del ejemplo a presentar.

Apache Kafka

Apache Kafka es aquella herramienta que permite construir pipeline de datos en tiempo real y streaming de aplicaciones. Apache kafka es tolerante a fallos y escalable horizontalmente.

Instalación.

El proceso de instalación es un proceso sencillo, simplemente, hay que realizar lo siguiente:

  1. Descarga del fichero comprimido con la herramienta.
  2. Descompresión del fichero descargado en una carpeta.
  3. Acceder a la carpeta principal y ejecutar los ficheros de inicio.

Para aquel lector interesado, existen varias imágenes de contenedores Docker de Kafka.

Inicio del Zookeeper y Kafka

Para iniciar Kafka es necesario ejecutar dos comandos: el primero, iniciar Zookeeper; y, el segundo, inicio del servidor de kafka. Para cada operación, es necesario la apertura de una consola. Así, los comandos son los siguientes:

  • Arranque de Zookeeper. La configuración de Zookeeper se encuentra en el fichero de configuración zookeeper.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar Zookeeper es el siguiente:
>./bin/zookeeper-server-start.sh config/zookeeper.properties
  • Arranque de kafka Server. La configuración de Apache Server se encuentra en el fichero de configuración server.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar el servidor de Kafka es el siguiente:
>./bin/kafka-server-start.sh config/server.properties

Creación de un topic de prueba

Apache Kafka trabaja con topics para el intercambio de información desde los productores hasta los consumidores. El comando para la creación del topic es el siguiente:

> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Las opciones del script tienen el siguiente significado:

  • –create: opción de creación del topic.
  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –replication-factor 1: opción para la definición del número de replicas del topic; en nuestro caso, valor 1.
  • –partitions 1: opción para defininir el número de particiones del topic; en nuestro caso, valor 1.
  • –topic test: nombre del topic a crear; en nuestro caso, test.

Creación de un productor.

Para la creación de un productor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un productor; y, desde ésta, poder escribir aquel texto que se quiera generar.

El comando para el inicio del productor es el siguiente:

> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Una vez ejecutado, la consola se queda a la espera para la introducción del texto deseado.

Creación de un consumidor.

Para la creación de un consumidor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un consumidor; y, desde esta, poder leer aquel texto que ha generado desde el productor.

El comando para el inicio del consumidor es el siguiente:

> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Las opciones del script tienen el siguiente significado:

  • –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
  • –topic test: nombre del topic a crear; en nuestro caso, test.
  • –from-beginning: opción para la definición del tipo de recepción.

Prueba de funcionamiento

Para la realización de un prueba de un productor y un consumidor, no hay mas que arrancar el productor en una terminal; arrancar el consumidor en una segunda terminal; y, por último,  escribir en el productor aquel texto que se quiera enviar al consumidor; como resultado de la ejecución, se visualizará en la terminal del consumidor el texto insertado en la terminal del productor.

Apache Spark

Apache Spark es un cluster de computación de proposito general el cual provee API en varios lenguajes como Java, Python y Scala, además de un motor  optimizado para la generación de gráficos. También soporta herramientas de alto nivel como son: Spark SQL, para el tratamiento de estructuras de datos; Spark MLLib, para machine learning; GraphX para el proceso gráfico y, por último, Spark Streaming.

Apache Spark Streaming es una extensión del core de Apache Spark con un API de alto rendimiento, escalable con un proceso de ingesta de datos tolerante a fallos. Los datos pueden ser ingestados desde distintas fuentes como son Kafka, Flume, un socket TCP,…; una vez ingestado, pueden ser procesados por funciones de orden superior; y, por último, el resultado del proceso puede ser almacenado en una base de datos, un fichero HDFS o un dashboard.

Gráficamente, Spark Streaming se puede definir de la siguiente forma:

Definición del problema

El problema que planteo es el siguiente: conexión de Apache Streaming con Apache kafka a traves de un topic con nombre test para poder cuantificar el número de palabras introducidas en un mensaje Kafka enviado al topic test desde un productor.

Definición de dependecias

Las dependencias necesarias para la realización del programa de interconexión son las siguientes:

  1. Definición de la dependecia de Spark Core
  2. Definición de la dependeicna con Spark Streaming
  3. Definición del conector de Spark con Kafka.

El objeto con las dependencias queda como sigue:

object Dependencies {
  val sparkVersion = "2.3.1"
  lazy val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion
  lazy val sparkStreamming = "org.apache.spark" %% "spark-streaming" % sparkVersion
  lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
}

El fichero build.sbt queda definido como sigue:

import Dependencies._
import sbt.Keys.libraryDependencies
ThisBuild / scalaVersion := "2.11.9"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "ejem-spark",
scalacOptions += "-Ypartial-unification", // 2.11.9+
libraryDependencies += sparkCore,
libraryDependencies += sparkStreamming,
libraryDependencies += sparkStreamingKafka

Solución en Scala

La funcionalidad con la conexión a Kafka consiste en lo siguiente: definición del contexto de Spark y Spark Streaming, definición de la configuración a Kafka, creación del stream con la utilidad de Kafka, procesamiento del resultado; una vez definido, se realiza el arranque del contexto SparkStreaming y se queda a la espera de su finalización.

El código es el siguiente:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object EjemSparkStreamming {
  def exampleStreamming(): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("EjemSparkStreamming-kafka")
    val ssc = new StreamingContext(conf, Seconds(2))
    val topics = "test" // lista de Topic de Kafka
    val brokers = "localhost:9092" // broker de Kafka
    val groupId = "0" // Identificador del grupo.
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
       ConsumerConfig.GROUP_ID_CONFIG -> groupId,
       ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
       ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
  def main(args: Array[String]): Unit = {
    exampleStreamming()
  }
}

La configuración de conexión a Kafka se define en las variables topics, brokers y groupId. Topics, puede tener una lista de nombres de topic separados por comas; brokers, el endpoint de kafka; y, groupId, del grupo de topics, en nuestro caso no hemos definido. Todos los parámetros, se definen en la estructura Map kafkaParams.

KafkaUtils es aquel componente que realiza la definición del stream al cual se le pasa el contexto de Streaming, las estrategias de localización de los topic y la estrategia de consumidores.

Ejecución y prueba

Para realizar pruebas es necesario tener la infraestructura de Apache Kafka levantada y un productor arrancado; y, por la parte de Spark, arrancaremos la aplicación de forma normal. Así, ejecutaremos los siguiente pasos:

  • En la consola del productor, escribiremos el siguiente texto: «esto es una prueba de Streaming. esto es una prueba»
  • En la consola del programa, el cual estará ejecutándose constantemente, cada dos segundos, realizará la comprobación del topic con el siguiente escritura en la consola:
[...]
19/06/06 17:02:34 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 1329 bytes result sent to driver
19/06/06 17:02:34 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 8 ms on localhost (executor driver) (1/1)
-------------------------------------------
Time: 1559833354000 ms
-------------------------------------------
(es,2)
(una,2)
(Streaming.,1)
(de,1)
(esto,2)
(prueba,2)
[...]

 

Patrón Type Class y Spark

En las pasadas entradas centradas en el patrón type class con título Patrón Type Class  y Patrón Type Class: definición de leyes y test, realicé una descripción de la estructura de dicho patrón. En la presente entrada, Patrón Type Class y Spark, me centraré en la definición de un API mediante el patrón type class utilizando Apache Spark.

Apache Spark es aquel motor de procesamiento y análisis de datos. Apache Spark es una solución muy utilizada en el ámbito del Big Data.

El problema del ejercicio solucionada con Apache Spark consiste en extraer los datos existentes en unas tablas de una base de datos, crear un fichero en formato parquet de los datos; y, dicho fichero, dejarlo en una estructura de directorios. Todo ello, lo más configurable posible.

La definición de las tablas fuentes de datos son tablas, con nombre y campos con un enfoque didáctico, es decir, son tablas con nombre no relevantes. Definiré dos tablas: la tabla libros, con los siguientes campos: codigo, nombre y población; y, la tabla dat_country, con los mismos campos.

El tipo básico del ejemplo es aquel tipo que puede trabajar con el valor parametrizado cuya definición es la siguiente: type Postgresql[T] = T. El tipo lo he identificado con Postgres porque es con la base de datos con la cual realizaré el ejercicio.

La definición del fichero sbt del ejecercicio es la siguiente:

import sbt.Keys.libraryDependencies
name := "EjemploTypeClassSpark"
version := "1.0"
scalaVersion := "2.11.9"
scalacOptions += "-Ypartial-unification" // 2.11.9+
val spark_version = "2.3.1" 
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % spark_version,
  "org.apache.spark" %% "spark-sql" % spark_version,
  "com.typesafe" % "config" % "1.3.2",
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

La definición del API con nombre Engine estará formada por una única función con nombre dataExtraction la cual realizará la extración de datos, creación de un fichero en formato parquet y escritura en un path sin datos de retorno, es decir, de tipo Unit. Así, la definición de la parte funcional del type class es la siguiente:

import java.util.{ Properties}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import conf.Configuration
import typeclass.DTO.DataConfiguration
import util.Util
trait Engine[P[_]] {
  def dataExtraction(): P[Unit]
}
object Engine extends EngineInstances with EngineSyntax
trait EngineSyntax{
  object syntax{
    def |-> [P[_]]()(implicit BDI:Engine[P]): P[Unit] = BDI.dataExtraction()
  }
}

En el snippet anterior, definimos los siguientes elementos: el trait Engine del API, la definición del objeto Engine que hereda de las instancias y se comporta como un elemento como EngineSyntax. En este ejercicio, por simplificar, se omite la definición del trait con la definición del lenguaje y las leyes matemáticas que debe de cumplir.

La definición de las instancias con los efectos de lado es la siguiente:

trait EngineInstances{
  def apply[P[_]](implicit BDI:Engine[P]): Engine[P] = BDI
  import typeclass.Types.Postgresql
  implicit object EnginePostgress extends Engine[Postgresql]{
  def logger = LoggerFactory.getLogger(this.getClass)
  def loadTableField(nombreTable:String): List[String] = {
    nombreTable match {
       case "pruebas.libros" => List("codigo","nombre","poblacion")
       case "pruebas.dat_country" => List("codigo","nombre","poblacion")
       case _ => List()
   }
 }
def loadTableToParquet( spark:SparkSession, dataConfiguration:DataConfiguration ): Unit = {
  val engineSpark = spark
    .read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", dataConfiguration.url)
    .option("dbtable", dataConfiguration.nameTable)
    .option("user", dataConfiguration.user )
    .option("password", dataConfiguration.password)
    .option("fetchsize", "1000")
    .load()
    val parameter: List[String] = loadTableField(dataConfiguration.nameTable) //: _*
    engineSpark.select( parameter.toSeq.head, parameter.tail.toSeq: _* ).write.format("parquet").save( Util.getFilePathTarget(dataConfiguration.nameBucket ,dataConfiguration.nameTable)   )
}
def runSpark(url:String, user:String, password:String, numTables:Int, prop:Properties): Either[String, Unit] = {
  val spark = SparkSession.builder.appName("EjemploTypeClassSpark").getOrCreate()
  for(num <- 1 to numTables){
    try{
      val nameTable = prop.getProperty("table_" + num)
      if(!nameTable.equals(null) && !nameTable.equals("")){
        logger.info(s"[*****] Número tabla: ${num}, nombre tabla: ${nameTable}.")
        loadTableToParquet(spark, DataConfiguration(numTables=num, nameTable=nameTable, url=url, user=user, password=password, nameBucket = prop.getProperty("bucketName")) )
      }else{
        logger.info(s"[*****] Número tabla: table_${num} VACÍA")
      }
   } catch {
     case ex: Exception => {
       logger.info(s"[*****] Error en la carga de la tabla con número table_${num}")
       logger.error(s"Exception: ${ex.getMessage}")
     }
   }
 }
 spark.close()
 Right(Unit)
}
override def dataExtraction(): Postgresql[Unit] = {
  for{
    environment <- Configuration.loadEnvironmentVariables.right
    properties <- Configuration.loadProperties(environment.configuration).right
    _ <- runSpark(environment.url, environment.user , environment.password, properties.numTables, properties.properties ).right
  }yield{
    Right(Unit)
  }
 }
}
[...]
}

En el snippet anterior, definimos un trait EngineInstances con un constructor y un objeto implícito EnginePostgress el cual hereda del API Engine. En este trait, si definimos otro tipo de parámetro, como por ejemplo:Either[String,T] definido como type Oracle[T] = Either[String,T], sería el lugar para su implementación.

El objeto EnginePostgress define la función dataExtraction la cual define el programa con las siguientes sentencias: primero, larga de variables de entorno; segunda, carga de la configuración de los ficheros de properties; tercero, las operaciones con Spark; y, por último, el retorno de un elemento de tipo Unit. Las tres funciones son funciones que retornan un contenedor binario de tipo Either.

La función runSpark es aquella función que realiza la constructuctión de una sesión de Spark para realizar una operación sobre una tabla configurada; dicha función, se realiza en la función loadTableToParquet.

La función loadTableToParquet es aquella función que realiza la carga de la configuración del motor Spark para una tabla de una base de datos determinada, extrae los datos en formato parquet y deja el fichero en una ubicación determinada.

La aplicación cliente del API Engine es la siguiente:

import typeclass.Engine.syntax._
import typeclass.Types.Postgresql
object App extends App{
  val startTime: Long = System.currentTimeMillis()
  |->[Postgresql]()
}

El fichero properties con la configuración con el nombre de las tablas, el número de tablas a extraer y el path con el directorio en donde se almacena el resultado es el siguiente:

bucketName=~/tmp/
num_tables=2
table_1=pruebas.libros
table_2=pruebas.dat_country

La variables de entorno a configurar para la ejecución de la aplicación son las siguientes:

export url=jdbc:postgresql://localhost:5432/prueba
export user=postgres
export password=password
export configuration=~/workspace/EjemploTypeClassSpark/src/main/resources/configuration_bdi.properties

Los componentes software para la carga de las variables de entorno y carga de ficheros properties nos las describo en la entrada para reducir el tamaño y por centrar el ejemplo en el patrón type class.

Para finalizar y poder ejecutar la aplicación en local con Apache Spark se ejecuta con el siguiente comando:

cd ~/scala/spark-2.3.1-bin-hadoop2.7/bin
spark-submit --driver-class-path postgresql-42.1.4.jre6.jar --class "App" --master local[2] ~/workspace/EjemploTypeClassSpark/target/scala-2.11/ejemplotypeclassspark_2.11-1.0.jar

El resultado de la ejecución es la creación del fichero con los datos extraídos en la carpeta ~/tmp/ del sistema de directorios.