Skip to content

java

Apache Kafka (y 4)

En este artículo se muestra un ejemplo de construcción de un cliente Java para Apache Kafka que utiliza Kafka Streams. Al igual que en artículos anteriores de esta serie, la documentación oficial representa un buen punto de partida, por lo que sigue el guión propuesto en la misma.

Preparación

Para la ejecución de este ejemplo se presupone que se encuentra arrancado un servidor local de Kafka con la configuración por defecto según lo explicado en artículos anteriores:

El ejemplo leerá de un tópico de entrada y escribirá en un tópico de salida. Más concretamente leerá de un stream de entrada líneas de texto plano y escribirá en un stream de salida las distintas palabras utilizadas junto con el número total de ocurrencias de cada una de ellas.

El tópico de entrada se llamará “streams-plaintext-input” y puede crearse desde línea de comandos con la siguiente instrucción:

El tópico de salida se llamará “streams-wordcount-output” y puede crearse desde línea de comandos con la siguiente instrucción:

Lo único reseñable en este paso es que el segundo tópico se crea con una política de compactación. Lo que quiere decir que Kafka garantiza que el último valor asignado a cada clave estará disponible, pero no necesariamente el histórico de todos los valores asignados a cada clave. Entendiendo que en este caso concreto que las claves serán las distintas palabras encontradas en el texto, y los valores el número de ocurrencias de las mismas.

Maven

El API de Kafka Streams está publicado en Maven bajo las siguientes coordenadas:

Kafka Streams

El API de Kafka Streams permite crear clientes que procesen streams.

El código del ejemplo, un poco más extenso de lo habitual, define las propiedades de la conexión, se conecta al tópico de entrada, define la operación a realizar para alimentar el tópico de salida, arranca el proceso, y se queda a la espera hasta que se pulsa Ctrl+C para terminar en orden. Lo interesante es notar que se utiliza el estilo del API nativo de Java para la manipulación de Streams. El resto del API de Kafka apenas consiste en llamar a los métodos start y close.

Este ejemplo se incluye dentro de la propia distribución de Kafka, así que puede arrancarse directamente desde línea de comandos con la siguiente instrucción:

La aplicación se queda a la espera de que se alimente el tópico de entrada, algo que puede hacerse creando un productor con la siguiente línea de comandos:

El productor se queda esperando a que se introduzca algún texto por teclado, de forma que todas las líneas introducidas por teclado se envían al tópico “streams-plaintext-input”.

El último paso es crear un consumidor que examine el tópico de salida, algo que puede hacerse con la siguiente línea de comandos:

El consumidor muestra por pantalla el contenido del tópico “streams-wordcount-output” a medida que se va almacenando información en el mismo.

Si todo ha ido correctamente, cuando se introduzca algún texto en el stream de entrada, deberá verse el conteo de palabras en el stream de salida actualizado.

Apache Kafka (3)

En este artículo se muestran un par de ejemplos de uso de la librería cliente para Java de Apache Kafka. El objetivo es crear clientes básicos que actúen como productores o consumidores, que son dos de los casos de uso más habituales. Los ejemplos de la documentación oficial proporcionan un punto de partida bastante sólido, por lo que este artículo sigue el guión propuesto en los mismos.

Para ejecutar los ejemplos se debe tener levantado el cluster creado en el artículo anterior con un tópico de nombre “prueba”.

Maven

La librería cliente de Kafka está distribuida en Maven bajo las siguientes coordernadas:

Producer API

Este API permite crear clientes que publiquen registros.

El código de ejemplo anterior crea un productor que genera dos registros que publica en el tópico “prueba”. Cada registro compuesto de una clave y valor, ambos de tipo String.

La clase principal es KafkaProducer. Una clase thread safe reutilizable que admite una configuración con una gran cantidad de opciones, por lo que el método preferido para instanciarla es pasarle el clásico objeto de tipo Properties de Java en el constructor con todos los parámetros.

El productor funciona de forma asíncrona no bloqueante. La llamada al método send no envía los registros al servidor de forma inmediata, los almacena de forma local. Los registros se envían al servidor en un thread aparte en función de la configuración. Esta estrategia permite reducir el tráfico de red, enviando varios registros en un solo paquete, y actuando acorde al caso de uso habitual de los productores, que notifican eventos de forma masiva al servidor. No obstante, un punto importante de esta forma de funcionamiento es recordar llamar al método close, o flush, para garantizar que todos los registros almacenados de forma local se envían al servidor antes de terminar la conexión.

El método send retorna un Future que se completa con información acerca del envío. En dicho Future se incluye el tópico, el timestamp y el offset asignado al mensaje. Si se quiere esperar de forma síncrona bloqueante a que se complete el envío se puede llamar al métogo get del Future. Otra opción es pasar una función callback como parámetro al método send. En este último caso el envío será síncrono y la función callback se invocará cuando se complete el envío.

En cuanto a los parámetros de configuración, en el ejemplo se utilizan algunos de ellos. “acks” establece si el productor necesita que se confirme la escritura efectiva del registro en el tópico, el valor “0” indica que no se requiere ninguna confirmación, el valor “1” indica que se debe garantizar que el bróker líder del cluster lo persiste, y “all” indica que se debe garantizar la escritura efectiva por todos los brokers del cluster. “retries” indica el número de reintentos a realizar en caso de pérdida de conexión, se utiliza cero porque un número mayor puede dar lugar a registros duplicados en determinados escenarios como se verá más adelante. “batch.size” indica el número máximo de registros que se pueden acumular en local antes de su envío para cada partición, a más registros más memoria. “buffer.memory” indica el tamaño máximo de memoria que se debe utilizar para almacenar los registros de manera local, si se alcanza dicho máximo el método send se bloqueará hasta que vuelva a haber memoria disponible. “linger.ms” establece un tiempo de espera para enviar registros al servidor, lo que permite que se almacenen registros de forma local para enviarlos todos en un solo paquete de forma más eficiente. “key.serializer” y “value.serializer” indican las clases que deben utilizarse para serializar los valores de las claves y los valores en los registros.

Como se ha comentado anteriormente, en escenarios concretos de error o sobrecarga en el tráfico de red puede ocurrir que se dupliquen registros si se establece un número de reintentos mayor de cero. Para evitar este efecto no deseado, el productor puede configurarse estableciendo el parámetro “enable.idempotence” a “true” para trabajar en modo idempotente, un modo que garantiza que los registros no se duplican. Pero tampoco es la panacea, la garantía sólo se establece para una conexión concreta de un productor concreto. El funcionamiento de este modo se basa en la generación de un ID único para cada productor y un secuencial con el número de registros publicados. El servidor puede ignorar mensajes con secuenciales menores al último persistido con éxito, evitando así que se vuelva a publicar un mensaje ya publicado antes de una pérdida de conexión.

El productor también puede utilizarse para generar eventos dentro de una transacción, garantizando que se escriben todos, o ninguno, de los registros publicados dentro de los límites de la transacción. Aunque este modo de funcionamiento establece algunas restricciones sobre la configuración de los tópicos, que deben tener un factor de replicación de tres o más, y exigen un mínimo de dos réplicas sincronizadas. Y de igual forma exige que los consumidores se configuren para consumir exclusivamente registros completamente persistidos y sincronizados en el cluster. El API para este modo es síncrono y eleva excepciones en caso de error.

En el código de ejemplo anterior se crea un productor que publica dos registros dentro de una transacción. El patrón que sigue es el uso de los clásicos métodos begin y commit para delimitar la transacción. El método initTransactions garantiza que no se inicia una transacción nueva sin haber terminado la anterior. Y el método abortTransaction se utiliza para abortar la transacción en curso en caso de error.

Consumer API

Este API permite crear clientes que consuman registros.

El código de ejemplo anterior crea un consumidor que consume los registros publicados en el tópico “prueba”. La clase principal es KafkaConsumer , que a diferencia de la clase del productor, NO es thread safe.

Un aspecto importante a tener en cuenta para la programación de consumidores es la noción de offset. Como ya se explicó en artículos anteriores, un offset es un secuencial que indica la posición de un mensaje dentro de un tópico. Los consumidores mantienen dos offsets, por una parte el offset del próximo mensaje que esperan recibir, y por otra parte el offset del último mensaje consumido de forma efectiva. Es decir, que el cliente puede decidir e indicar al servidor el offset del último mensaje que considera consumido. Más concretamente, llamando de forma manual al método commitSync o commitAsync. En caso de caída del consumidor, el servidor volverá a enviarle mensajes, cuando se vuelva a levantar, a partir del último offset confirmado.

Otro aspecto importante, también comentado en artículos anteriores, es el concepto de grupo de consumidores. Cada consumidor dentro de un grupo es asignado por el servidor a una partición de un tópico. Es decir, que los mensajes de un tópico son consumidos por un único consumidor. Si el consumidor cae, el servidor asignará automáticamente la partición a otro consumidor del grupo de consumidores. Si todos los consumidores de un mismo tópico se adhieren al mismo grupo de consumidores entonces Kakfa se comporta como una cola, de igual forma que la mayoría de sistemas de mensajería, donde un mensaje es consumido por un único cliente. Si cada consumidor de un mismo tópico tiene un identificador de grupo de consumidores distinto entonces Kakfa se comporta como un mecanismo de publicación/suscripción, donde cada mensaje es consumido por todos los clientes.

La clase KafkaConsumer permite asignar un consumidor a una partición concreta, recibir notificaciones cuando se produzca un cambio en la asignación de particiones, y realizar otras tareas relacionadas con la gestión de grupos de consumidores.

Respecto a los parámetros de configuración, estos se establecen a través de una instancia de la clase Properties como en la clase del productor. Los parámetros más relevantes en el ejemplo son “group.id”, que contiene el nombre del grupo de consumidores, junto con “enable.auto.commit” y “auto.commit.interval.ms”, que indican que el consumidor quiere que se comunique al servidor cada N milisegundos que los mensajes recibidos se consideran completamente consumidos de forma automática, sin que el consumidor tenga que llamar manualmente a commitSync o commitAsync.

El método poll consume mensajes del servidor. Si hay mensajes disponibles retorna inmediatamente dichos mensajes. Si no hay mensajes disponibles se bloquea el tiempo indicado. Si llegan mensajes durante la espera se desbloquea y retorna los mensajes recibidos. Si no llegan mensajes durante la espera retorna una lista vacía al expirar el tiempo indicado. Opcionalmente se puede indicar a través de la propiedad “max.poll.records” el número máximo de mensajes que el cliente quiere procesar de una sola vez. A diferencia de la mayoría de plataformas, donde el servidor tiene la responsabilidad de controlar todo lo que hacen sus clientes, Kafka utiliza una estrategia distinta, con clientes que implementan bastante lógica de control.

En el código de ejemplo anterior se crea un consumidor que consume mensajes y decide de forma manual cuando han sido efectivamente consumidos llamando a commitSync con el offset del último mensaje consumido, o más precisamente, con el offset del siguiente mensaje, ya que el método requiere sumar 1 al último offset procesado. En cualquier caso, lo que hay que tener claro es que nunca se puede garantizar un 100% de consistencia. Por ejemplo, el proceso podría caer antes de llamar a commitSync. En cuyo caso algunos mensajes se habrían procesado, pero no le llegaría la notificación a Kafka. Al volver a arrancar el consumidor, Kafka volvería a enviar los mismos últimos mensajes no confirmados por el consumidor.

Por último, comentar que la clase KafkaConsumer ofrece métodos como seek, seekToBeginning y seekToEnd para dejar que el consumidor especifique de forma precisa la posición a partir de la cual quiere consumir mensajes, permitiendo por ejemplo volver a consumir mensajes ya procesados, o ignorar mensajes y sólo consumir los nuevos.

Apache Kafka (2)

Continuando con la serie dedicada a Apache Kafka, en este artículo se revisa el proceso de instalación y algunas de las tareas básicas que pueden realizarse para familiarizarse con el software. La documentación es bastante efectiva a este respecto, por lo que este artículo se limita a seguir paso por paso el guión propuesto en la misma. El objetivo es instalar y verificar el funcionamiento de Kakfa de forma local en una máquina Windows partiendo de cero y sin utilizar Docker.

Descarga

La versión actual de Kafka en el momento de escribir este artículo es la 2.0.0. Puede descargarse desde la propia página de descargas oficial del proyecto. El descargable es un fichero comprimido en formato .tgz que puede descomprimirse en cualquier directorio.

Instalación

El software descomprimido se distribuye en cuatro directorios. El directorio bin contiene scripts para ser ejecutados desde línea de comandos, tanto para Windows en formato .bat como para UNIX en formato .sh. El directorio config contiene los ficheros de configuración en formato .properties. El directorio libs contiene las dependencias en forma de librerías de Java en formato .jar. Y el directorio site-docs contiene toda la documentación comprimida en formato .tgz.

El único requerimiento para ejecutar Kafka es tener instalada una máquina virtual de Java 1.8 o superior. Con tenerla disponible en el PATH de la máquina es suficiente.

Arranque

Kafka utiliza Zookeeper, por lo que es necesario tener una instancia de este software arrancada. El propio Kafka proporciona el ejecutable y un script para arrancar una instancia preconfigurada con valores por defecto:

Una vez arrancado Zookeeper se puede arrancar Kafka con otro script y valores por defecto:

Si todo se ejecuta de forma correcta se mostrará en ambos casos el log de ejecución de los procesos por consola.

Cada fichero .properties pasado como parámetro en la línea de comandos contiene los valores de configuración utilizados por defecto para los servidores. Cada entrada de cada fichero está documentada con bastante detalle y en su conjunto resulta sencillo de entender. La documentación proporciona un listado exhaustivo con todos los parámetros de configuración disponibles.

Uno de los parámetros importantes dentro de la configuración es el que establece el directorio donde se almacena la información. Por defecto su valor es /tmp/kafka-logs. Al arrancar el servidor se puede comprobar cómo se crea el directorio y ficheros dentro del mismo.

Tópico

Para crear tópicos se puede utilizar el script kafka-topics  desde línea de comandos con el parámetro --create. Por ejemplo, para crear un tópico de nombre “prueba” con una partición se puede ejecutar la siguiente línea:

Si todo se ejecuta de forma correcta, se mostrará un mensaje confirmando la creación:

Adicionalmente se puede comprobar con el mismo script y el parámetro --list, que lista los tópicos disponibles:

Tal y como indica la documentación, Kafka se puede configurar para crear los tópicos de forma automática si no existen en vez de tener que crearlos de forma manual.

Para los más curiosos del lugar, comentar que estos scripts en realidad se limitan a invocar una clase Java que se encuentra dentro de libs\kafka_<versión de Scala>-<versión de Kafka>.jar.

Publicación/Suscripción

Para publicar mensajes en un tópico se puede utilizar un script que abre un cliente que captura la entrada de teclado y publica las líneas tecleadas como mensajes:

Para suscribirse a un tópico y recibir los mensajes publicados se puede ejecutar otro cliente que se queda a la espera y muestra por pantalla todos los mensajes recibidos:

Si se ejecuta varias veces el último script se puede comprobar que los mensajes son persistidos y que se pueden volver a procesar tantas veces como se quiera.

Como nota al margen, comentar que si se ejecutan los scripts sin parámetros se muestra un texto de ayuda con todas las opciones disponibles.

Cluster

Un ejercicio un poco más sofisticado es configurar Kafka como un clúster. Para el ejemplo se utilizan tres servidores ejecutándose sobre una misma máquina de manera local.

El primer paso es crear un fichero .properties para cada servidor, tomando como punto de partida el fichero por defecto utilizado para arrancar el primer servidor.

El segundo paso es modificar los ficheros para configurar un identificador, puertos y directorio distintos. Para ello hay que editarlos manualmente y sustituir las siguientes entradas:

  • En config/server-1.properties:

  • En config/server-2.properties:

Y el tercer paso es arrancar dos nuevos servidores utilizando los nuevos ficheros de configuración:

A partir de aquí ya se puede empezar a probar el funcionamiento. Por ejemplo, creando un tópico con un factor de replicación de 3:

Comprobando que efectivamente está replicado:

Publicando mensajes:

Matando el proceso del servidor líder:

Y comprobando que los mensajes siguen disponibles a través de un seguidor que ahora se comporta como lider:

Connectors

El Connector API de Kafka permite importar y exportar datos desde diversas fuentes externas. Por ejemplo, desde un fichero de texto plano, o desde el commit log de una base de datos relacional. Es decir, que se puede configurar Kafka para que monitorice un recurso y a medida que se añada información en él se replique en tiempo real en un tópico. O alimentar en tiempo real un recurso a partir de los mensajes publicados en un tópico.

Para el ejemplo demostrativo de esta funcionalidad se configura el cluster del ejemplo anterior para monitorizar el contenido de un fichero de texto plano de nombre test.txt, publicar su contenido a medida que se le añade información en un tópico de nombre “connect-test”, y generar otro fichero de nombre test.sink.txt con la información publicada en el tópico.

El primer paso es crear el fichero de texto test.txt con un par de líneas en el directorio raíz de la instalación:

El segundo paso es instalar los conectores, que son los componentes software que implementan todo el proceso. Se crean dos. El primero para monitorizar el fichero de entrada, y el segundo para generar el fichero de salida:

Y el tercer y último paso es comprobar que el fichero de salida contiene las líneas del fichero de entrada:

Si se abre el fichero de entrada y se añade otra línea, automáticamente se replicará en el fichero de salida. Opcionalmente se puede comprobar que cada línea es publicada como un mensaje en el tópico:

Notar que disponer de los mensajes publicados en un tópico permite además que puedan ser consumidos por más procesos.

Apache Kafka (1)

Cuando se pulsa una tecla en el teclado de un ordenador se produce una interrupción. Lo que quiere decir que el ordenador debe dejar de hacer lo que estaba haciendo para atender dicha interrupción. El sistema operativo recoge el carácter correspondiente a la tecla pulsada y lo notifica a las aplicaciones. Esto permite por ejemplo que una aplicación web pueda indicarle a un navegador que ejecute un determinado trozo de código JavaScript cuando se produzca la pulsación de una tecla.

El término interrupción se utiliza habitualmente para referirse a la acción que sucede a bajo nivel. En un nivel más alto de abstracción estas acciones se conocen como eventos. La mayoría de aplicaciones procesan eventos tales como la pulsación de una tecla o un botón de un ratón de manera local. Algunas además los reenvían a un servidor remoto. El análisis de estos eventos permite conocer de forma muy precisa como interaccionan los usuarios con una aplicación.

Enviar eventos de teclado o ratón a un servidor remoto implica generar una gran cantidad de tráfico. Aunque no por ello deja de ser una réplica del modelo de más bajo nivel. Una aplicación notifica eventos a un servidor de igual forma que un teclado o ratón los notifica a un ordenador. Lo que se puede generalizar diciendo que todo sistema de información puede entenderse como un conjunto de procesos que se comunican generando y respondiendo a eventos. La tecnología o estrategia utilizada para procesar la información puede variar, pero hay patrones recurrentes. Utilizar algún tipo de almacén intermedio para evitar la pérdida de eventos es uno de ellos. El buffer que utiliza un teclado para almacenar las últimas teclas pulsadas es similar al que utiliza un servidor para almacenar los eventos recibidos. Y en este sentido, Apache Kafka puede verse como uno de estos almacenes de eventos.

Kafka

Apache Kafka es una plataforma altamente escalable, creada originalmente por LinkedIn, que facilita el proceso de ingentes cantidades de información en tiempo real. Su caso de uso más habitual es su utilización como sistema de mensajería. Donde la publicación de un mensaje por parte de un proceso representa un evento que se produce en el sistema, conteniendo el mensaje el detalle concreto del evento generado, y permitiendo que una aplicación lo procese con objeto de consumirlo completamente, o lo transforme con objeto de volver a publicarlo como parte de una cadena de procesamiento mayor.

La versión actual de Kafka es la 2.0.0, una major release liberada a finales de julio de este mismo año.

La introducción de tan bajo nivel de los primeros párrafos es para tratar de responder las preguntas habituales acerca de para qué sirve Kafka y cuando se debe utilizar. Se podría utilizar siempre, para procesar todos los eventos generados por un sistema de cualquier tamaño, pero se debe utilizar preferentemente cuando el volumen de información a gestionar, sobre todo en tiempo real, sea enorme. El rendimiento y la escalabilidad son dos de los factores clave que deberían tenerse en cuenta a la hora de tomar la decisión de utilizarlo.

Persistencia

Kafka está diseñado para almacenar y distribuir grandes volúmenes de eventos de forma muy eficiente. Una característica que lo distingue del resto de productos similares es que almacena los eventos de forma duradera. Una aplicación puede suscribirse para recibir los nuevos eventos que se produzcan, o recibir los eventos que se produjeron en un periodo de tiempo concreto dado. Esta característica permite ejecutar procesos sobre eventos generados en el pasado como si se estuvieran produciendo en tiempo real. Y posibilita la repetición a posteriori en caso de error de un mismo proceso con los mismos eventos.

En este punto es importante entender la visión que ofrece Kafka respecto a los eventos. Son las secuencias de información que representan la historia del proceso realizado por un sistema. Almacenando el histórico de eventos consumidos y producidos por un sistema se puede reproducir el proceso realizado por el mismo. Los eventos se pueden consumir en tiempo real, por ejemplo para responder de forma inmediata a las peticiones realizadas por un cliente a través de una aplicación, pero también pueden procesarse con una menor prioridad, por ejemplo para agregar información de cara a su posterior análisis con algún tipo de herramienta a modo de cuadro de mandos. Con esta visión en mente, es fácil entender que Kafka se denomine a sí mismo “structured commit log”, es decir, un sistema de log que almacena los mensajes de forma estructurada para facilitar el tratamiento de los mismos, a diferencia de un log tradicional donde los mensajes son cadenas de texto añadidos en un fichero sin ninguna estructura.

Estructura

Kafka escala horizontalmente gracias a su arquitectura distribuida en forma de cluster con una alta tolerancia a fallos basada en Zookeeper. Un cluster de Kafka almacena información organizada en categorías llamadas tópicos e identificadas por un nombre. Donde cada tópico es un almacén de secuencias ordenadas de una unidad mínima de información denominado registro. Y cada registro es un paquete de información compuesto de una clave, un valor y un timestamp.

Los tópicos se organizan internamente en estructuras llamadas particiones. Estas particiones son similares a las existentes en muchas bases de datos relacionales, permitiendo almacenar los registros agrupados por algún criterio determinado con objeto de mejorar los tiempos de acceso. Los clientes que producen registros eligen la partición en la que deben publicarse estos. Cuando se publica un registro en un tópico se almacena en una partición y se le asigna un offset que indica su posición dentro de la partición. Un offset es un número secuencial único que identifica de forma única a un registro. Una característica de Kafka a este respecto es que los clientes que consumen registros son los que llevan el control de los offsets; cuando se subscriben a un tópico indican el offset del primer registro que quieren recibir. El control lo lleva el cliente, no el servidor, lo que permite simplificar al máximo la lógica del servidor. De hecho, Kafka gestiona los offsets de los consumidores con un tópico, lo que le evita tener que mantener estructuras privadas dedicadas y le permite reutilizar toda la infraestructura existente.

Los registros son persistidos en los tópicos en base a una política de retención configurable. Los registros pueden almacenarse durante horas, días, o indefinidamente según cada caso de uso concreto. Kafka garantiza un tiempo de respuesta constante independientemente del número de registros almacenados.

Todas las lecturas y escrituras sobre una determinada partición son realizadas por un único servidor denominado bróker y que actúa como líder. Por cada líder pueden existir otros brokers actuando como seguidores que replican la partición de forma pasiva. Si el líder de una partición cae entonces uno de sus seguidores se promociona automáticamente a líder haciéndose cargo de gestionar la partición. Incluso es posible replicar un mismo cluster en varios datacenters, ya sea como mecanismo de backup, o con el propósito de tener la información distribuida por regiones geográficas distintas.

Grupos de Consumidores

Un concepto importante en Kafka son los denominados grupos de consumidores. Cada grupo de consumidores está ligado a un tópico, y cada consumidor dentro de un grupo está ligado a una partición. Si todos los consumidores de un tópico pertenecen al mismo grupo entonces los registros se distribuyen de forma que se garantiza que un mismo registro es consumido una única vez por un único consumidor del grupo. Si los consumidores pertenecen a varios grupos entonces un mismo registro se distribuye a un único consumidor de cada grupo.

Esto permite configurar Kafka siguiendo los patrones habituales utilizados en los sistemas de mensajería clásicos. Si se utiliza un único grupo entonces el sistema se comporta como una cola, donde un mensaje es consumido por único proceso. Si se utilizan múltiples grupos entonces el sistema se comporta como un sistema de publicación/suscripción, donde un mensaje es consumido por varios procesos. Tener un único consumidor por cada partición garantiza que los registros se consumen en el mismo orden que se publican, pero implica que Kafka sólo garantiza el orden a nivel de partición, no de tópico.

API

Kafka está escrito en Scala y Java, aunque existen clientes para prácticamente todos los lenguajes de programación de uso más habitual. El cliente para Java es desarrollado por el propio proyecto de forma oficial.

Kafka expone su funcionalidad a través de un protocolo sobre TCP mediante el que oferta servicios agrupados en base a su cometido. Cada grupo de servicios es un API distinto. Cinco de ellos constituyen el núcleo del sistema. Producer API permite publicar registros en los tópicos. Consumer API permite suscribirse a los tópicos y consumir registros. Streams API permite consumir registros de tópicos para volver a publicarlos transformados en otros tópicos. Connector API permite construir pasarelas entre Kafka y otros sistemas, como por ejemplo una base de datos relacional. Y AdminClient API que permite gestionar los distintos componentes que conforman una instancia o cluster de Kafka.

Para el caso de uso de Kafka como un sistema de procesamiento de streams, además del API, existe un proyecto llamado KSQL que se liberó este mismo año y permite escribir procesos que consuman streams con un lenguaje similar a SQL.

Resumiendo, Kafka es una plataforma que aúna distintas características que permiten utilizarlo como un sistema de almacenamiento, un sistema de mensajería, y un sistema de procesamiento de streams muy eficiente y altamente escalable.

Spring Boot 2 (y 5)

Para cerrar el ejemplo de uso básico de Spring Boot 2, en este artículo se revisan algunas formas de empaquetar una aplicación web de cara a su distribución. Decidir utilizar un formato u otro depende de cada caso de uso en concreto, no existe una opción mejor que las demás. En algunos entornos/empresas se sigue requiriendo un WAR/EAR para su despliegue en un servidor de aplicaciones, en otros se requiere un único fichero JAR que se pueda ejecutar con una determinada máquina virtual, y en otros se requiere una imagen de Docker.

War

Spring Boot permite empaquetar una aplicación en un fichero WAR, eliminando el servidor embebido del empaquetado, y que se puede desplegar sobre un servidor de aplicaciones. Lo que quiere decir que lo que se obtiene es un único fichero con la aplicación, con sus dependencias incluidas dentro del propio WAR, o proporcionadas por el servidor de aplicaciones con objeto de ser compartidas por todas las aplicaciones desplegadas en el mismo.

Para generar un WAR hay que cambiar el formato de empaquetado del proyecto en el fichero pom.xml

Para excluir el servidor Tomcat embebido por defecto por Spring Boot dentro de la aplicación hay que añadir una entrada en el fichero pom.xml indicando que las librerías de Tomcat son proporcionadas de forma externa:

Para hacer que la aplicación pueda reaccionar al proceso de inicialización del servidor de aplicaciones, ya que ahora será el servidor y no Spring Boot quien arranque la aplicación, se debe cambiar la clase con el punto de entrada de la aplicación para que extienda de SpringBootServletInitializer y sobreescribir su método configure:

Y por último, para generar el WAR, hay que ejecutar la tarea de empaquetado estándar de Maven:

Como resultado de la ejecución de la tarea se genera un fichero WAR en el directorio \target que puede desplegarse sobre un servidor de aplicaciones de la forma acostumbrada.

A pesar de considerarse un modelo obsoleto, esta forma de distribución se sigue utilizando en muchas empresas. Dejar un fichero en un directorio para que lo desplieguen en un servidor de aplicaciones sigue siendo bastante más habitual de lo que debería.

Nested JARs

Spring Boot permite crear un único fichero JAR que contenga todas las dependencias en un formato propio llamado Nested JARs. Lo que quiere decir que lo que se obtiene es un único fichero, con la aplicación y sus dependencias, y se necesita una máquina virtual para ejecutarlo.

Para generar este tipo de ficheros se debe añadir al fichero pom.xml un plugin de Spring Boot:

Y ejecutar la tarea estándar de empaquetado de Maven:

Como resultado de la ejecución de la tarea se genera un fichero JAR en el directorio \target que puede ejecutarse con la máquina virtual de Java de la forma acostumbrada:

El fichero JAR que genera el plugin de Spring Boot contiene tanto las clases de la aplicación como las librerías de terceros. Es un modelo de empaquetado similar al formato Uber JAR, en el que todas las clases de todas las librerías de terceros se incluyen dentro del JAR como si fueran parte de la aplicación. La diferencia es que Spring Boot incluye los JARs completos, en vez de sólo sus clases, y utiliza un loader propio para poder realizar la carga de dichos JARs.

Es conveniente abrir el JAR generado y examinarlo para entender como está construido. Las clases en el directorio raíz son el cargador de Spring Boot, y las librerías están en un directorio propio llamado BOOT-INF, de forma similar a como se encuentran en el directorio WEB-INF de una aplicación web.

En la práctica, sólo he visto utilizar este modelo de distribución en un proyecto, concretamente para una aplicación de escritorio que se incluía dentro de los terminales de punto de venta de una gran cadena comercial.

Docker

Las aplicaciones empaquetadas en un único fichero son adecuadas para su distribución en entornos cloud. La mayoría de plataformas de computación en la nube permiten configurar una capa por encima de las aplicaciones con la máquina virtual de Java o el servidor de aplicaciones que necesiten.

Si una aplicación se distribuye como un WAR necesita un servidor de aplicaciones y una máquina virtual de Java, pero puede haber problemas en el entorno de producción si hay diferencias entre el software proporcionado por la plataforma y el utilizado en el entorno de desarrollo. De igual forma, si una aplicación se distribuye como un JAR con un servidor embebido, sólo necesita una máquina virtual de Java, pero todavía puede haber problemas si las versiones en los entornos de producción y desarrollo son distintas. Por su parte, en una imagen de Docker se empaqueta tanto la aplicación como todo el software que necesita, consiguiendo que los entornos de desarrollo y producción sean lo más parecidos posibles.

La forma más directa de generar una imagen de Docker es escribir manualmente un fichero de texto plano con nombre Dockerfile, tal cual, sin extensión. En este fichero se indica la imagen base que se quiere utilizar y los pasos necesarios para construir la nueva imagen a partir de la base.

Para nuestro ejemplo necesitamos una imagen que tenga una máquina virtual de Java 10. La imagen oficial proporcionada por OpenJDK en su versión slim está basada en Debian y pesa casi 400Mb, pero para el ejemplo el tamaño no importa.

Spring Boot recomienda utilizar un fichero Dockerfile como el siguiente:

En el fichero se parte de la imagen del OpenJDK, se indica que se cree un directorio temporal, se copie el JAR de la aplicación, y que cuando se ejecute la imagen se levante la maquina virtual de Java para correr la aplicación desde el JAR copiado.

El directorio temporal no es estrictamente necesario, pero Spring Boot recomienda crearlo para garantizar que todas las aplicaciones de Java funcionen correctamente, ya que algunas lo necesitan. El parámetro java.security.egd se configura para hacer que Tomcat arranque más rápido, y lo que hace es definir un fuente de entropía no bloqueante para la generación de números aleatorios necesarios para las librerías de seguridad.

Una vez escrito el fichero Dockerfile lo siguiente es invocar a docker desde línea de comandos para construir la imagen de la forma acostumbrada:

La nueva imagen se listará junto con el resto:

Y se podrá ejecutar por su nombre (tag) haciendo público el puerto en el que se encuentra el servicio:

Si todo el proceso se ejecuta de forma correcta la aplicación arrancará y el servicio estará disponible en el puerto indicado.

Para terminar, comentar que existen otros modelos de distribución a parte de los tres vistos. Uno de ellos es empaquetar la aplicación como un autoejecutable, de forma que pueda ejecutarse sin necesidad de anteponer  java -jar, pero sólo funciona en determinados entornos UNIX.