En la presente entrada, «Apache Kafka & Apache Spark: un ejemplo de Spark Streaming en Scala», describo cómo definir un proceso de streaming con Apache Spark con una fuente de datos Apache Kafka definido en lenguaje Scala.
La estructura del artículo está compuesta por los siguientes apartados:
- Apache Kafka. En este apartado realizaré una breve presentación de Kafka, instalación y arranque de los elementos necesarios para el ejemplo.
- Apache Spark. En este apartado realizaré una breve descripción de Spark streaming y la descripción del ejemplo a presentar.
Apache Kafka
Apache Kafka es aquella herramienta que permite construir pipeline de datos en tiempo real y streaming de aplicaciones. Apache kafka es tolerante a fallos y escalable horizontalmente.
Instalación.
El proceso de instalación es un proceso sencillo, simplemente, hay que realizar lo siguiente:
- Descarga del fichero comprimido con la herramienta.
- Descompresión del fichero descargado en una carpeta.
- Acceder a la carpeta principal y ejecutar los ficheros de inicio.
Para aquel lector interesado, existen varias imágenes de contenedores Docker de Kafka.
Inicio del Zookeeper y Kafka
Para iniciar Kafka es necesario ejecutar dos comandos: el primero, iniciar Zookeeper; y, el segundo, inicio del servidor de kafka. Para cada operación, es necesario la apertura de una consola. Así, los comandos son los siguientes:
- Arranque de Zookeeper. La configuración de Zookeeper se encuentra en el fichero de configuración zookeeper.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar Zookeeper es el siguiente:
>./bin/zookeeper-server-start.sh config/zookeeper.properties
- Arranque de kafka Server. La configuración de Apache Server se encuentra en el fichero de configuración server.properties; para nuestro caso, empleamos la configuración por defecto. El comando para iniciar el servidor de Kafka es el siguiente:
>./bin/kafka-server-start.sh config/server.properties
Creación de un topic de prueba
Apache Kafka trabaja con topics para el intercambio de información desde los productores hasta los consumidores. El comando para la creación del topic es el siguiente:
> ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Las opciones del script tienen el siguiente significado:
- –create: opción de creación del topic.
- –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
- –replication-factor 1: opción para la definición del número de replicas del topic; en nuestro caso, valor 1.
- –partitions 1: opción para defininir el número de particiones del topic; en nuestro caso, valor 1.
- –topic test: nombre del topic a crear; en nuestro caso, test.
Creación de un productor.
Para la creación de un productor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un productor; y, desde ésta, poder escribir aquel texto que se quiera generar.
El comando para el inicio del productor es el siguiente:
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Una vez ejecutado, la consola se queda a la espera para la introducción del texto deseado.
Creación de un consumidor.
Para la creación de un consumidor y realización de las pruebas, utilizaremos la herramienta de línea de comando con la cual nos permite el arranque de un consumidor; y, desde esta, poder leer aquel texto que ha generado desde el productor.
El comando para el inicio del consumidor es el siguiente:
> ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Las opciones del script tienen el siguiente significado:
- –bootstrap-server localhost:9092 : opción para la definición del endpoint del servidor.
- –topic test: nombre del topic a crear; en nuestro caso, test.
- –from-beginning: opción para la definición del tipo de recepción.
Prueba de funcionamiento
Para la realización de un prueba de un productor y un consumidor, no hay mas que arrancar el productor en una terminal; arrancar el consumidor en una segunda terminal; y, por último, escribir en el productor aquel texto que se quiera enviar al consumidor; como resultado de la ejecución, se visualizará en la terminal del consumidor el texto insertado en la terminal del productor.
Apache Spark
Apache Spark es un cluster de computación de proposito general el cual provee API en varios lenguajes como Java, Python y Scala, además de un motor optimizado para la generación de gráficos. También soporta herramientas de alto nivel como son: Spark SQL, para el tratamiento de estructuras de datos; Spark MLLib, para machine learning; GraphX para el proceso gráfico y, por último, Spark Streaming.
Apache Spark Streaming es una extensión del core de Apache Spark con un API de alto rendimiento, escalable con un proceso de ingesta de datos tolerante a fallos. Los datos pueden ser ingestados desde distintas fuentes como son Kafka, Flume, un socket TCP,…; una vez ingestado, pueden ser procesados por funciones de orden superior; y, por último, el resultado del proceso puede ser almacenado en una base de datos, un fichero HDFS o un dashboard.
Gráficamente, Spark Streaming se puede definir de la siguiente forma:
Definición del problema
El problema que planteo es el siguiente: conexión de Apache Streaming con Apache kafka a traves de un topic con nombre test para poder cuantificar el número de palabras introducidas en un mensaje Kafka enviado al topic test desde un productor.
Definición de dependecias
Las dependencias necesarias para la realización del programa de interconexión son las siguientes:
- Definición de la dependecia de Spark Core
- Definición de la dependeicna con Spark Streaming
- Definición del conector de Spark con Kafka.
El objeto con las dependencias queda como sigue:
object Dependencies { val sparkVersion = "2.3.1" lazy val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion lazy val sparkStreamming = "org.apache.spark" %% "spark-streaming" % sparkVersion lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0" }
El fichero build.sbt queda definido como sigue:
import Dependencies._ import sbt.Keys.libraryDependencies ThisBuild / scalaVersion := "2.11.9" ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / organization := "com.example" ThisBuild / organizationName := "example" lazy val root = (project in file(".")) .settings( name := "ejem-spark", scalacOptions += "-Ypartial-unification", // 2.11.9+ libraryDependencies += sparkCore, libraryDependencies += sparkStreamming, libraryDependencies += sparkStreamingKafka
Solución en Scala
La funcionalidad con la conexión a Kafka consiste en lo siguiente: definición del contexto de Spark y Spark Streaming, definición de la configuración a Kafka, creación del stream con la utilidad de Kafka, procesamiento del resultado; una vez definido, se realiza el arranque del contexto SparkStreaming y se queda a la espera de su finalización.
El código es el siguiente:
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ object EjemSparkStreamming { def exampleStreamming(): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("EjemSparkStreamming-kafka") val ssc = new StreamingContext(conf, Seconds(2)) val topics = "test" // lista de Topic de Kafka val brokers = "localhost:9092" // broker de Kafka val groupId = "0" // Identificador del grupo. // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) val lines = messages.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination() } def main(args: Array[String]): Unit = { exampleStreamming() } }
La configuración de conexión a Kafka se define en las variables topics, brokers y groupId. Topics, puede tener una lista de nombres de topic separados por comas; brokers, el endpoint de kafka; y, groupId, del grupo de topics, en nuestro caso no hemos definido. Todos los parámetros, se definen en la estructura Map kafkaParams.
KafkaUtils es aquel componente que realiza la definición del stream al cual se le pasa el contexto de Streaming, las estrategias de localización de los topic y la estrategia de consumidores.
Ejecución y prueba
Para realizar pruebas es necesario tener la infraestructura de Apache Kafka levantada y un productor arrancado; y, por la parte de Spark, arrancaremos la aplicación de forma normal. Así, ejecutaremos los siguiente pasos:
- En la consola del productor, escribiremos el siguiente texto: «esto es una prueba de Streaming. esto es una prueba»
- En la consola del programa, el cual estará ejecutándose constantemente, cada dos segundos, realizará la comprobación del topic con el siguiente escritura en la consola:
[...] 19/06/06 17:02:34 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 1329 bytes result sent to driver 19/06/06 17:02:34 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 8 ms on localhost (executor driver) (1/1) ------------------------------------------- Time: 1559833354000 ms ------------------------------------------- (es,2) (una,2) (Streaming.,1) (de,1) (esto,2) (prueba,2) [...]