Skip to content

WebAssembly 101

A lo largo de los años ha habido varios intentos para permitir ejecutar código binario en los navegadores. Los applets de Java, el player de Flash, Silverlight de Microsoft, Native Client de Chrome, o asm.js de Firefox son ejemplos de ello. El pastel era muy grande y todos querían llevarse el trozo más grande. Después de muchos años intentándolo con iniciativas particulares y tecnologías propietarias, al final todas las grandes compañías detrás de los principales navegadores se pusieron de acuerdo para crear una especificación estándar. Esa especificación es WebAssembly. Y lo más importante que se puede decir sobre WebAssembly es precisamente eso, que es una especificación que está siendo definida bajo el paraguas de la W3C.

WebAssembly permite ejecutar aplicaciones en un navegador con un rendimiento similar al que se obtiene ejecutando código nativo.

WebAssembly lleva un año disponible en los principales navegadores, siendo Firefox el que mayor rendimiento ha ofrecido hasta el momento, con un sorprendente buen comportamiento en Edge y un tanto decepcionante en Chrome. O al menos hasta la incorporación de Litoff a las últimas versiones de Chrome. Litoff es una nueva máquina virtual que se ha añadido a Chrome, específicamente para WebAssembly, con la que ha reducido significativamente la brecha que tenía con el resto de navegadores. Porque WebAssembly es precisamente eso, la especificación de una máquina virtual que permite ejecutar código en formato binario de manera segura.

Uno de los comentarios más habituales cuando se empezó a popularizar WebAssembly era el de que a largo plazo sería el lenguaje de estándar de facto de la web que vendría a reemplazar a JavaScript. Lo que ha resultado ser una verdad a medias. Porque WebAssembly no es sólo una máquina virtual, sino también el lenguaje de programación que se ejecuta sobre dicha máquina virtual. Pero es un lenguaje de muy bajo nivel, no se espera a día de hoy que los desarrolladores escriban código utilizándolo. De hecho, lo que se espera es precisamente todo lo contrario, es decir, que se utilicen otros lenguajes de programación, tradicionalmente utilizados para desarrollos en la parte servidora, como por ejemplo C++ o Go. Afirmación que puede resultar algo sorprendente, pero que se fundamenta en el hecho de que el conjunto de instrucciones que define WebAssembly es muy básico, por lo que es factible compilar código escrito en un lenguaje de alto nivel y generar el código equivalente escrito en WebAssembly. Y esto de hecho es ya una realidad, muchos lenguajes de programación permite compilar actualmente a WebAssembly.

Para entender el impacto de WebAssembly en la web hay que tener en cuenta el objetivo que pretende alcanzar. Y este no es otro que proporcionar un entorno seguro y de alto rendimiento para la ejecución de aplicaciones. Un objetivo realmente ambicioso. La pieza que faltaba para que los navegadores puedan venir a eliminar prácticamente toda interacción de los usuarios con el sistema operativo subyacente. Una afirmación un tanto exagerada quizás, hasta que se empieza a ver ejemplos de lo que algunos equipos de desarrollo están consiguiendo con WebAssembly, siendo AutoCAD posiblemente uno de los ejemplos más paradigmáticos.

AutoCAD es una veterana aplicación para diseño 3D muy popular en entornos profesionales. AutoDesk, su empresa desarrolladora, lleva años intentando llevar su software a la web, y tal como ellos mismos afirman, no ha sido hasta la aparición de WebAssembly que sienten que realmente lo han conseguido. Partiendo de un mismo código base escrito en C++ ahora son capaces de compilar a nativo o a WebAssembly, de forma que su producto puede ejecutarse como una aplicación de escritorio nativa tradicional o distribuirse como una aplicación web ordinaria accesible desde cualquier dispositivo dotado de un navegador. Esto quiere decir que ya no es necesario crear ejecutables para los distintos sistemas operativos y arquitecturas hardware, la versión en WebAssembly se ejecuta sobre cualquier navegador. Y además lo hace de forma eficiente y segura.

Como ya se ha comentando, WebAssembly es la especificación de una máquina virtual. Su propósito es definir un conjunto de instrucciones, un contexto de ejecución, y las reglas que rigen la ejecución de dichas instrucciones en dicho contexto. El conjunto de instrucciones define como realizar operaciones o modificar el flujo de proceso. El contexto de ejecución define las capacidades disponibles para la ejecución de las instrucciones. Y las reglas definen como validar las instrucciones de cara a su ejecución.

Algunas de las características principales de la máquina virtual de WebAssembly son las siguientes:

  • Define un conjunto de instrucciones. En dos formatos. Formato de texto para facilitar la lectura/escritura de código. Y formato binario equivalente, un opcode para cada instrucción, para facilitar la compilación y ejecución del código. El número total de instrucciones disponibles actualmente es pequeño, menos de 256, por lo que 1 byte basta representar todas las instrucciones.
  • Basa su funcionamiento en una pila (stack) estructurada. Todas las instrucciones extraen sus argumentos de la pila y almacenan el resultado de su ejecución en la pila. La elección de una estructura de pila simplifica la gramática de las instrucciones, permite una compilación más directa, y facilita la implementación de algunos patrones, como por ejemplo retornar múltiples valores desde una función.
  • Trabaja con tipos básicos. Los únicos tipos soportados son enteros de 32 bits, enteros de 64 bits, decimales de 32 bits y decimales de 64 bits. Estos tipos son soportados por todas las arquitecturas de hardware actuales. Tipos más pequeños, como los enteros de 8 o 16 bits, en la práctica son tratados de manera interna como enteros de 32 bits por la mayoría de los lenguajes de programación en tiempo de ejecución.
  • Opera sobre un bloque de memoria de tamaño predeterminado, múltiplo de 64 KB, accedido de forma lineal, y direccionable a byte. WebAssembly especifica que la memoria debe reservarse en forma de sandbox, es decir, como una región de memoria reservada de forma expresa para ello, no sobre una región de memoria reservada previamente para otro propósito, ni tan siquiera sobre la memoria reservada para la pila.
  • Los programas se organizan en módulos. Los módulos en secciones. Una sección de código contiene funciones, de la misma forma que se entiende en la mayoría de lenguajes de alto nivel, y pueden definir variables locales. Estas variables se almacenan en su propia pila, alojada fuera del espacio de memoria ordinaria direccionable por las instrucciones. Mantener los valores de las variables locales en su propio espacio de memoria aumenta aún más la seguridad del sistema. Y permite tratar la máquina virtual como si tuviera un número infinito de registros.
  • Es determinista. Al menos en la medida de lo posible. Incluso aunque ello vaya en contra de uno de sus objetivos de diseño. Por ejemplo, muchos lenguajes de programación soportan cierto grado de indeterminismo, como las estructuras de tamaño variable en función de la plataforma hardware. Si se quiere compilar estos lenguajes a WebAssembly, los compiladores tendrán que generar el código necesario para adaptarse al comportamiento determinista definido por WebAssembly, aún a costa de una mayor cantidad de código generado o una ligera pérdida de rendimiento, lo que va en contra de los objetivos de diseño de WebAssembly, que establecen que se debe facilitar el proceso de compilación eficiente.

Otro aspecto importante a tener en cuenta es que, aunque WebAssembly está pensado para ejecutarse embebido en el contexto de un navegador web, en la práctica es totalmente independiente y una implementación de la máquina virtual es factible en cualquier entorno. De hecho, ya se está utilizando la especificación como base para construir microkernels que ejecutan WebAssembly.

Por supuesto el caso de uso más habitual a día de hoy es utilizar WebAssembly dentro de un navegador, por lo que la especificación define un API para JavaScript que permite cargar, ejecutar módulos WebAssembly, y comunicarse con código JavaScript. La compilación y verificación del código WebAssembly es muy rápida, se realiza a medida que el código se descarga, sin tener que esperar que se descargue completamente. Y los navegadores ya empiezan a soportan depurar el código en WebAssembly de igual forma que el código en JavaScript ordinario con la ayuda de sourcemaps.

Por último, comentar que la adopción de WebAssembly es un hecho. Por ejemplo, Unity, el popular motor multiplataforma para la creación de videojuegos, ya permite generar código en WebAssembly para la web. Combinado con WebGL, proporciona una experiencia similar a los videojuegos escritos en código nativo. En un futuro es posible incluso que aparezcan frameworks de alto rendimiento para la creación de aplicaciones web que utilicen un elemento de tipo canvas para el apartado gráfico en vez de utilizar DOM y CSS.

Mi contribución a la causa es una extensión de WebAssembly para Visual Studio Code que publiqué hace unos meses y añade coloreado de sintaxis al código escrito en WebAssembly.

Apache Kafka (y 4)

En este artículo se muestra un ejemplo de construcción de un cliente Java para Apache Kafka que utiliza Kafka Streams. Al igual que en artículos anteriores de esta serie, la documentación oficial representa un buen punto de partida, por lo que sigue el guión propuesto en la misma.

Preparación

Para la ejecución de este ejemplo se presupone que se encuentra arrancado un servidor local de Kafka con la configuración por defecto según lo explicado en artículos anteriores:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

bin\windows\kafka-server-start.bat config\server.properties

El ejemplo leerá de un tópico de entrada y escribirá en un tópico de salida. Más concretamente leerá de un stream de entrada líneas de texto plano y escribirá en un stream de salida las distintas palabras utilizadas junto con el número total de ocurrencias de cada una de ellas.

El tópico de entrada se llamará “streams-plaintext-input” y puede crearse desde línea de comandos con la siguiente instrucción:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 
  --replication-factor 1 --partitions 1 --topic streams-plaintext-input

El tópico de salida se llamará “streams-wordcount-output” y puede crearse desde línea de comandos con la siguiente instrucción:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
  --replication-factor 1 --partitions 1 --topic streams-wordcount-output
  --config cleanup.policy=compact

Lo único reseñable en este paso es que el segundo tópico se crea con una política de compactación. Lo que quiere decir que Kafka garantiza que el último valor asignado a cada clave estará disponible, pero no necesariamente el histórico de todos los valores asignados a cada clave. Entendiendo que en este caso concreto que las claves serán las distintas palabras encontradas en el texto, y los valores el número de ocurrencias de las mismas.

Maven

El API de Kafka Streams está publicado en Maven bajo las siguientes coordenadas:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>2.0.0</version>
</dependency>

Kafka Streams

El API de Kafka Streams permite crear clientes que procesen streams.

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

public class WordCountDemo {

  public static void main(String[] args) throws Exception {
    // Define la propiedades de conexión
    var props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
      Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
      Serdes.String().getClass());

    // Conecta al tópico de entrada
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> source = builder.stream("streams-plaintext-input");

    // Define el proceso para agrupar la entrada y alimentar la salida
    source.flatMapValues(
      value -> Arrays.asList(
        value.toLowerCase(Locale.getDefault()).split("\\W+")))

      .groupBy((key, value) -> value)

      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(
        "counts-store")).toStream()

      .to("streams-wordcount-output", 
        Produced.with(Serdes.String(), Serdes.Long()));

    final var topology = builder.build();
    final var streams = new KafkaStreams(topology, props);

    // Espera Ctrl+C para terminar en orden
    final var latch = new CountDownLatch(1);

    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close();
        latch.countDown();
      }
    });

    // Inicia el proceso
    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }

    System.exit(0);
  }
}

El código del ejemplo, un poco más extenso de lo habitual, define las propiedades de la conexión, se conecta al tópico de entrada, define la operación a realizar para alimentar el tópico de salida, arranca el proceso, y se queda a la espera hasta que se pulsa Ctrl+C para terminar en orden. Lo interesante es notar que se utiliza el estilo del API nativo de Java para la manipulación de Streams. El resto del API de Kafka apenas consiste en llamar a los métodos start y close.

Este ejemplo se incluye dentro de la propia distribución de Kafka, así que puede arrancarse directamente desde línea de comandos con la siguiente instrucción:

bin\windows\kafka-run-class.bat
  org.apache.kafka.streams.examples.wordcount.WordCountDemo

La aplicación se queda a la espera de que se alimente el tópico de entrada, algo que puede hacerse creando un productor con la siguiente línea de comandos:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092
  --topic streams-plaintext-input

El productor se queda esperando a que se introduzca algún texto por teclado, de forma que todas las líneas introducidas por teclado se envían al tópico “streams-plaintext-input”.

El último paso es crear un consumidor que examine el tópico de salida, algo que puede hacerse con la siguiente línea de comandos:

bin\windows\kafka-console-consumer.bat
  --bootstrap-server localhost:9092
  --topic streams-wordcount-output
  --from-beginning
  --formatter kafka.tools.DefaultMessageFormatter
  --property print.key=true
  --property print.value=true
  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

El consumidor muestra por pantalla el contenido del tópico “streams-wordcount-output” a medida que se va almacenando información en el mismo.

Si todo ha ido correctamente, cuando se introduzca algún texto en el stream de entrada, deberá verse el conteo de palabras en el stream de salida actualizado.

Apache Kafka (3)

En este artículo se muestran un par de ejemplos de uso de la librería cliente para Java de Apache Kafka. El objetivo es crear clientes básicos que actúen como productores o consumidores, que son dos de los casos de uso más habituales. Los ejemplos de la documentación oficial proporcionan un punto de partida bastante sólido, por lo que este artículo sigue el guión propuesto en los mismos.

Para ejecutar los ejemplos se debe tener levantado el cluster creado en el artículo anterior con un tópico de nombre “prueba”.

Maven

La librería cliente de Kafka está distribuida en Maven bajo las siguientes coordernadas:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>

Producer API

Este API permite crear clientes que publiquen registros.

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Productor {

  public static void main(String[] args) {
    var props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("buffer.memory", 33554432);
    props.put("linger.ms", 1);
    props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");

    var producer = new KafkaProducer<String, String>(props);

    producer.send(new ProducerRecord<>("prueba", "Sherlock", "Holmes"));
    producer.send(new ProducerRecord<>("prueba", "Hercule", "Poirot"));

    producer.close();
  }
}

El código de ejemplo anterior crea un productor que genera dos registros que publica en el tópico “prueba”. Cada registro compuesto de una clave y valor, ambos de tipo String.

La clase principal es KafkaProducer. Una clase thread safe reutilizable que admite una configuración con una gran cantidad de opciones, por lo que el método preferido para instanciarla es pasarle el clásico objeto de tipo Properties de Java en el constructor con todos los parámetros.

El productor funciona de forma asíncrona no bloqueante. La llamada al método send no envía los registros al servidor de forma inmediata, los almacena de forma local. Los registros se envían al servidor en un thread aparte en función de la configuración. Esta estrategia permite reducir el tráfico de red, enviando varios registros en un solo paquete, y actuando acorde al caso de uso habitual de los productores, que notifican eventos de forma masiva al servidor. No obstante, un punto importante de esta forma de funcionamiento es recordar llamar al método close, o flush, para garantizar que todos los registros almacenados de forma local se envían al servidor antes de terminar la conexión.

El método send retorna un Future que se completa con información acerca del envío. En dicho Future se incluye el tópico, el timestamp y el offset asignado al mensaje. Si se quiere esperar de forma síncrona bloqueante a que se complete el envío se puede llamar al métogo get del Future. Otra opción es pasar una función callback como parámetro al método send. En este último caso el envío será síncrono y la función callback se invocará cuando se complete el envío.

En cuanto a los parámetros de configuración, en el ejemplo se utilizan algunos de ellos. “acks” establece si el productor necesita que se confirme la escritura efectiva del registro en el tópico, el valor “0” indica que no se requiere ninguna confirmación, el valor “1” indica que se debe garantizar que el bróker líder del cluster lo persiste, y “all” indica que se debe garantizar la escritura efectiva por todos los brokers del cluster. “retries” indica el número de reintentos a realizar en caso de pérdida de conexión, se utiliza cero porque un número mayor puede dar lugar a registros duplicados en determinados escenarios como se verá más adelante. “batch.size” indica el número máximo de registros que se pueden acumular en local antes de su envío para cada partición, a más registros más memoria. “buffer.memory” indica el tamaño máximo de memoria que se debe utilizar para almacenar los registros de manera local, si se alcanza dicho máximo el método send se bloqueará hasta que vuelva a haber memoria disponible. “linger.ms” establece un tiempo de espera para enviar registros al servidor, lo que permite que se almacenen registros de forma local para enviarlos todos en un solo paquete de forma más eficiente. “key.serializer” y “value.serializer” indican las clases que deben utilizarse para serializar los valores de las claves y los valores en los registros.

Como se ha comentado anteriormente, en escenarios concretos de error o sobrecarga en el tráfico de red puede ocurrir que se dupliquen registros si se establece un número de reintentos mayor de cero. Para evitar este efecto no deseado, el productor puede configurarse estableciendo el parámetro “enable.idempotence” a “true” para trabajar en modo idempotente, un modo que garantiza que los registros no se duplican. Pero tampoco es la panacea, la garantía sólo se establece para una conexión concreta de un productor concreto. El funcionamiento de este modo se basa en la generación de un ID único para cada productor y un secuencial con el número de registros publicados. El servidor puede ignorar mensajes con secuenciales menores al último persistido con éxito, evitando así que se vuelva a publicar un mensaje ya publicado antes de una pérdida de conexión.

El productor también puede utilizarse para generar eventos dentro de una transacción, garantizando que se escriben todos, o ninguno, de los registros publicados dentro de los límites de la transacción. Aunque este modo de funcionamiento establece algunas restricciones sobre la configuración de los tópicos, que deben tener un factor de replicación de tres o más, y exigen un mínimo de dos réplicas sincronizadas. Y de igual forma exige que los consumidores se configuren para consumir exclusivamente registros completamente persistidos y sincronizados en el cluster. El API para este modo es síncrono y eleva excepciones en caso de error.

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

public class ProductorTransaccional {

  public static void main(String[] args) {
    var props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");

    var producer = new KafkaProducer<String, String>(props);

    producer.initTransactions();

    try {
      producer.beginTransaction();

      producer.send(new ProducerRecord<>("prueba", "Miss", "Marple"));
      producer.send(new ProducerRecord<>("prueba", "Philip", "Marlowe"));

      producer.commitTransaction();
    } catch (ProducerFencedException | 
             OutOfOrderSequenceException | 
             AuthorizationException e) {
      producer.close();
    } catch (KafkaException e) {
      producer.abortTransaction();
    }

    producer.close();
  }
}

En el código de ejemplo anterior se crea un productor que publica dos registros dentro de una transacción. El patrón que sigue es el uso de los clásicos métodos begin y commit para delimitar la transacción. El método initTransactions garantiza que no se inicia una transacción nueva sin haber terminado la anterior. Y el método abortTransaction se utiliza para abortar la transacción en curso en caso de error.

Consumer API

Este API permite crear clientes que consuman registros.

import java.time.Duration;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumidor {

  public static void main(String[] args) {
    var props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "grupo-prueba");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");

    var consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("prueba"));

    while (true) {
      var records = consumer.poll(Duration.ofMillis(1000));

      for (var record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", 
          record.offset(), record.key(), record.value());
      }
    }
  }
}

El código de ejemplo anterior crea un consumidor que consume los registros publicados en el tópico “prueba”. La clase principal es KafkaConsumer , que a diferencia de la clase del productor, NO es thread safe.

Un aspecto importante a tener en cuenta para la programación de consumidores es la noción de offset. Como ya se explicó en artículos anteriores, un offset es un secuencial que indica la posición de un mensaje dentro de un tópico. Los consumidores mantienen dos offsets, por una parte el offset del próximo mensaje que esperan recibir, y por otra parte el offset del último mensaje consumido de forma efectiva. Es decir, que el cliente puede decidir e indicar al servidor el offset del último mensaje que considera consumido. Más concretamente, llamando de forma manual al método commitSync o commitAsync. En caso de caída del consumidor, el servidor volverá a enviarle mensajes, cuando se vuelva a levantar, a partir del último offset confirmado.

Otro aspecto importante, también comentado en artículos anteriores, es el concepto de grupo de consumidores. Cada consumidor dentro de un grupo es asignado por el servidor a una partición de un tópico. Es decir, que los mensajes de un tópico son consumidos por un único consumidor. Si el consumidor cae, el servidor asignará automáticamente la partición a otro consumidor del grupo de consumidores. Si todos los consumidores de un mismo tópico se adhieren al mismo grupo de consumidores entonces Kakfa se comporta como una cola, de igual forma que la mayoría de sistemas de mensajería, donde un mensaje es consumido por un único cliente. Si cada consumidor de un mismo tópico tiene un identificador de grupo de consumidores distinto entonces Kakfa se comporta como un mecanismo de publicación/suscripción, donde cada mensaje es consumido por todos los clientes.

La clase KafkaConsumer permite asignar un consumidor a una partición concreta, recibir notificaciones cuando se produzca un cambio en la asignación de particiones, y realizar otras tareas relacionadas con la gestión de grupos de consumidores.

Respecto a los parámetros de configuración, estos se establecen a través de una instancia de la clase Properties como en la clase del productor. Los parámetros más relevantes en el ejemplo son “group.id”, que contiene el nombre del grupo de consumidores, junto con “enable.auto.commit” y “auto.commit.interval.ms”, que indican que el consumidor quiere que se comunique al servidor cada N milisegundos que los mensajes recibidos se consideran completamente consumidos de forma automática, sin que el consumidor tenga que llamar manualmente a commitSync o commitAsync.

El método poll consume mensajes del servidor. Si hay mensajes disponibles retorna inmediatamente dichos mensajes. Si no hay mensajes disponibles se bloquea el tiempo indicado. Si llegan mensajes durante la espera se desbloquea y retorna los mensajes recibidos. Si no llegan mensajes durante la espera retorna una lista vacía al expirar el tiempo indicado. Opcionalmente se puede indicar a través de la propiedad “max.poll.records” el número máximo de mensajes que el cliente quiere procesar de una sola vez. A diferencia de la mayoría de plataformas, donde el servidor tiene la responsabilidad de controlar todo lo que hacen sus clientes, Kafka utiliza una estrategia distinta, con clientes que implementan bastante lógica de control.

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

public class ConsumidorCommit {

  public static void main(String[] args) {
    var props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "grupo-prueba");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");

    var consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Collections.singletonList("prueba"));

    while (true) {
      var records = consumer.poll(Duration.ofMillis(1000));

      for (var partition : records.partitions()) {
        var partitionRecords = records.records(partition);

        for (var record : partitionRecords) {
          System.out.println(record.offset() + ": " + record.value());
        }

        var lastOffset = partitionRecords.get(partitionRecords.size() - 1)
          .offset();
        consumer.commitSync(Collections.singletonMap(partition, 
          new OffsetAndMetadata(lastOffset + 1)));
      }
    }
  }
}

En el código de ejemplo anterior se crea un consumidor que consume mensajes y decide de forma manual cuando han sido efectivamente consumidos llamando a commitSync con el offset del último mensaje consumido, o más precisamente, con el offset del siguiente mensaje, ya que el método requiere sumar 1 al último offset procesado. En cualquier caso, lo que hay que tener claro es que nunca se puede garantizar un 100% de consistencia. Por ejemplo, el proceso podría caer antes de llamar a commitSync. En cuyo caso algunos mensajes se habrían procesado, pero no le llegaría la notificación a Kafka. Al volver a arrancar el consumidor, Kafka volvería a enviar los mismos últimos mensajes no confirmados por el consumidor.

Por último, comentar que la clase KafkaConsumer ofrece métodos como seek, seekToBeginning y seekToEnd para dejar que el consumidor especifique de forma precisa la posición a partir de la cual quiere consumir mensajes, permitiendo por ejemplo volver a consumir mensajes ya procesados, o ignorar mensajes y sólo consumir los nuevos.

Apache Kafka (2)

Continuando con la serie dedicada a Apache Kafka, en este artículo se revisa el proceso de instalación y algunas de las tareas básicas que pueden realizarse para familiarizarse con el software. La documentación es bastante efectiva a este respecto, por lo que este artículo se limita a seguir paso por paso el guión propuesto en la misma. El objetivo es instalar y verificar el funcionamiento de Kakfa de forma local en una máquina Windows partiendo de cero y sin utilizar Docker.

Descarga

La versión actual de Kafka en el momento de escribir este artículo es la 2.0.0. Puede descargarse desde la propia página de descargas oficial del proyecto. El descargable es un fichero comprimido en formato .tgz que puede descomprimirse en cualquier directorio.

Instalación

El software descomprimido se distribuye en cuatro directorios. El directorio bin contiene scripts para ser ejecutados desde línea de comandos, tanto para Windows en formato .bat como para UNIX en formato .sh. El directorio config contiene los ficheros de configuración en formato .properties. El directorio libs contiene las dependencias en forma de librerías de Java en formato .jar. Y el directorio site-docs contiene toda la documentación comprimida en formato .tgz.

El único requerimiento para ejecutar Kafka es tener instalada una máquina virtual de Java 1.8 o superior. Con tenerla disponible en el PATH de la máquina es suficiente.

Arranque

Kafka utiliza Zookeeper, por lo que es necesario tener una instancia de este software arrancada. El propio Kafka proporciona el ejecutable y un script para arrancar una instancia preconfigurada con valores por defecto:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Una vez arrancado Zookeeper se puede arrancar Kafka con otro script y valores por defecto:

bin\windows\kafka-server-start.bat config\server.properties

Si todo se ejecuta de forma correcta se mostrará en ambos casos el log de ejecución de los procesos por consola.

Cada fichero .properties pasado como parámetro en la línea de comandos contiene los valores de configuración utilizados por defecto para los servidores. Cada entrada de cada fichero está documentada con bastante detalle y en su conjunto resulta sencillo de entender. La documentación proporciona un listado exhaustivo con todos los parámetros de configuración disponibles.

Uno de los parámetros importantes dentro de la configuración es el que establece el directorio donde se almacena la información. Por defecto su valor es /tmp/kafka-logs. Al arrancar el servidor se puede comprobar cómo se crea el directorio y ficheros dentro del mismo.

Tópico

Para crear tópicos se puede utilizar el script kafka-topics  desde línea de comandos con el parámetro –create. Por ejemplo, para crear un tópico de nombre “prueba” con una partición se puede ejecutar la siguiente línea:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 
  --replication-factor 1 --partitions 1 --topic prueba

Si todo se ejecuta de forma correcta, se mostrará un mensaje confirmando la creación:

Created topic "prueba".

Adicionalmente se puede comprobar con el mismo script y el parámetro –list, que lista los tópicos disponibles:

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

Tal y como indica la documentación, Kafka se puede configurar para crear los tópicos de forma automática si no existen en vez de tener que crearlos de forma manual.

Para los más curiosos del lugar, comentar que estos scripts en realidad se limitan a invocar una clase Java que se encuentra dentro de libs\kafka_<versión de Scala>-<versión de Kafka>.jar.

Publicación/Suscripción

Para publicar mensajes en un tópico se puede utilizar un script que abre un cliente que captura la entrada de teclado y publica las líneas tecleadas como mensajes:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092
  --topic prueba

> uno
> dos
> tres

Para suscribirse a un tópico y recibir los mensajes publicados se puede ejecutar otro cliente que se queda a la espera y muestra por pantalla todos los mensajes recibidos:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
  --topic prueba --from-beginning

uno
dos
tres

Si se ejecuta varias veces el último script se puede comprobar que los mensajes son persistidos y que se pueden volver a procesar tantas veces como se quiera.

Como nota al margen, comentar que si se ejecutan los scripts sin parámetros se muestra un texto de ayuda con todas las opciones disponibles.

Cluster

Un ejercicio un poco más sofisticado es configurar Kafka como un clúster. Para el ejemplo se utilizan tres servidores ejecutándose sobre una misma máquina de manera local.

El primer paso es crear un fichero .properties para cada servidor, tomando como punto de partida el fichero por defecto utilizado para arrancar el primer servidor.

copy config\server.properties config\server-1.properties
copy config\server.properties config\server-2.properties

El segundo paso es modificar los ficheros para configurar un identificador, puertos y directorio distintos. Para ello hay que editarlos manualmente y sustituir las siguientes entradas:

  • En config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
  • En config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2

Y el tercer paso es arrancar dos nuevos servidores utilizando los nuevos ficheros de configuración:

bin\windows\kafka-server-start.bat config\server-1.properties
bin\windows\kafka-server-start.bat config\server-2.properties

A partir de aquí ya se puede empezar a probar el funcionamiento. Por ejemplo, creando un tópico con un factor de replicación de 3:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
  --topic replicado --partitions 1 --replication-factor 3

Comprobando que efectivamente está replicado:

bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181
  --topic replicado

Topic:replicado PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicado Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

Publicando mensajes:

bin\windows\kafka-console-producer.bat --broker-list localhost:9092
  --topic replicado

>Mechor
>Gaspar
>Baltasar
^C

Matando el proceso del servidor líder:

wmic process where "caption = 'java.exe' and
  commandline like '%server-1.properties%'" get processid

ProcessId
NNNN

taskkill /pid NNNN /f

Correcto: se terminó el proceso con PID NNNN.

Y comprobando que los mensajes siguen disponibles a través de un seguidor que ahora se comporta como lider:

bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181
  --topic replicado

Topic:replicado PartitionCount:1 ReplicationFactor:3 Configs:
Topic: replicado Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
  --topic replicado --from-beginning

Melchor
Gaspar
Baltasar
^C

Connectors

El Connector API de Kafka permite importar y exportar datos desde diversas fuentes externas. Por ejemplo, desde un fichero de texto plano, o desde el commit log de una base de datos relacional. Es decir, que se puede configurar Kafka para que monitorice un recurso y a medida que se añada información en él se replique en tiempo real en un tópico. O alimentar en tiempo real un recurso a partir de los mensajes publicados en un tópico.

Para el ejemplo demostrativo de esta funcionalidad se configura el cluster del ejemplo anterior para monitorizar el contenido de un fichero de texto plano de nombre test.txt, publicar su contenido a medida que se le añade información en un tópico de nombre «connect-test», y generar otro fichero de nombre test.sink.txt con la información publicada en el tópico.

El primer paso es crear el fichero de texto test.txt con un par de líneas en el directorio raíz de la instalación:

Quijote
Sancho

El segundo paso es instalar los conectores, que son los componentes software que implementan todo el proceso. Se crean dos. El primero para monitorizar el fichero de entrada, y el segundo para generar el fichero de salida:

bin\windows\connect-standalone.bat config\connect-standalone.properties 

config\connect-file-source.properties config\connect-file-sink.properties

Y el tercer y último paso es comprobar que el fichero de salida contiene las líneas del fichero de entrada:

more test.sink.txt

Quijote
Sancho

Si se abre el fichero de entrada y se añade otra línea, automáticamente se replicará en el fichero de salida. Opcionalmente se puede comprobar que cada línea es publicada como un mensaje en el tópico:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
  --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"Quijote"}
{"schema":{"type":"string","optional":false},"payload":"Sancho"}

Notar que disponer de los mensajes publicados en un tópico permite además que puedan ser consumidos por más procesos.

Apache Kafka (1)

Cuando se pulsa una tecla en el teclado de un ordenador se produce una interrupción. Lo que quiere decir que el ordenador debe dejar de hacer lo que estaba haciendo para atender dicha interrupción. El sistema operativo recoge el carácter correspondiente a la tecla pulsada y lo notifica a las aplicaciones. Esto permite por ejemplo que una aplicación web pueda indicarle a un navegador que ejecute un determinado trozo de código JavaScript cuando se produzca la pulsación de una tecla.

El término interrupción se utiliza habitualmente para referirse a la acción que sucede a bajo nivel. En un nivel más alto de abstracción estas acciones se conocen como eventos. La mayoría de aplicaciones procesan eventos tales como la pulsación de una tecla o un botón de un ratón de manera local. Algunas además los reenvían a un servidor remoto. El análisis de estos eventos permite conocer de forma muy precisa como interaccionan los usuarios con una aplicación.

Enviar eventos de teclado o ratón a un servidor remoto implica generar una gran cantidad de tráfico. Aunque no por ello deja de ser una réplica del modelo de más bajo nivel. Una aplicación notifica eventos a un servidor de igual forma que un teclado o ratón los notifica a un ordenador. Lo que se puede generalizar diciendo que todo sistema de información puede entenderse como un conjunto de procesos que se comunican generando y respondiendo a eventos. La tecnología o estrategia utilizada para procesar la información puede variar, pero hay patrones recurrentes. Utilizar algún tipo de almacén intermedio para evitar la pérdida de eventos es uno de ellos. El buffer que utiliza un teclado para almacenar las últimas teclas pulsadas es similar al que utiliza un servidor para almacenar los eventos recibidos. Y en este sentido, Apache Kafka puede verse como uno de estos almacenes de eventos.

Kafka

Apache Kafka es una plataforma altamente escalable, creada originalmente por LinkedIn, que facilita el proceso de ingentes cantidades de información en tiempo real. Su caso de uso más habitual es su utilización como sistema de mensajería. Donde la publicación de un mensaje por parte de un proceso representa un evento que se produce en el sistema, conteniendo el mensaje el detalle concreto del evento generado, y permitiendo que una aplicación lo procese con objeto de consumirlo completamente, o lo transforme con objeto de volver a publicarlo como parte de una cadena de procesamiento mayor.

La versión actual de Kafka es la 2.0.0, una major release liberada a finales de julio de este mismo año.

La introducción de tan bajo nivel de los primeros párrafos es para tratar de responder las preguntas habituales acerca de para qué sirve Kafka y cuando se debe utilizar. Se podría utilizar siempre, para procesar todos los eventos generados por un sistema de cualquier tamaño, pero se debe utilizar preferentemente cuando el volumen de información a gestionar, sobre todo en tiempo real, sea enorme. El rendimiento y la escalabilidad son dos de los factores clave que deberían tenerse en cuenta a la hora de tomar la decisión de utilizarlo.

Persistencia

Kafka está diseñado para almacenar y distribuir grandes volúmenes de eventos de forma muy eficiente. Una característica que lo distingue del resto de productos similares es que almacena los eventos de forma duradera. Una aplicación puede suscribirse para recibir los nuevos eventos que se produzcan, o recibir los eventos que se produjeron en un periodo de tiempo concreto dado. Esta característica permite ejecutar procesos sobre eventos generados en el pasado como si se estuvieran produciendo en tiempo real. Y posibilita la repetición a posteriori en caso de error de un mismo proceso con los mismos eventos.

En este punto es importante entender la visión que ofrece Kafka respecto a los eventos. Son las secuencias de información que representan la historia del proceso realizado por un sistema. Almacenando el histórico de eventos consumidos y producidos por un sistema se puede reproducir el proceso realizado por el mismo. Los eventos se pueden consumir en tiempo real, por ejemplo para responder de forma inmediata a las peticiones realizadas por un cliente a través de una aplicación, pero también pueden procesarse con una menor prioridad, por ejemplo para agregar información de cara a su posterior análisis con algún tipo de herramienta a modo de cuadro de mandos. Con esta visión en mente, es fácil entender que Kafka se denomine a sí mismo “structured commit log”, es decir, un sistema de log que almacena los mensajes de forma estructurada para facilitar el tratamiento de los mismos, a diferencia de un log tradicional donde los mensajes son cadenas de texto añadidos en un fichero sin ninguna estructura.

Estructura

Kafka escala horizontalmente gracias a su arquitectura distribuida en forma de cluster con una alta tolerancia a fallos basada en Zookeeper. Un cluster de Kafka almacena información organizada en categorías llamadas tópicos e identificadas por un nombre. Donde cada tópico es un almacén de secuencias ordenadas de una unidad mínima de información denominado registro. Y cada registro es un paquete de información compuesto de una clave, un valor y un timestamp.

Los tópicos se organizan internamente en estructuras llamadas particiones. Estas particiones son similares a las existentes en muchas bases de datos relacionales, permitiendo almacenar los registros agrupados por algún criterio determinado con objeto de mejorar los tiempos de acceso. Los clientes que producen registros eligen la partición en la que deben publicarse estos. Cuando se publica un registro en un tópico se almacena en una partición y se le asigna un offset que indica su posición dentro de la partición. Un offset es un número secuencial único que identifica de forma única a un registro. Una característica de Kafka a este respecto es que los clientes que consumen registros son los que llevan el control de los offsets; cuando se subscriben a un tópico indican el offset del primer registro que quieren recibir. El control lo lleva el cliente, no el servidor, lo que permite simplificar al máximo la lógica del servidor. De hecho, Kafka gestiona los offsets de los consumidores con un tópico, lo que le evita tener que mantener estructuras privadas dedicadas y le permite reutilizar toda la infraestructura existente.

Los registros son persistidos en los tópicos en base a una política de retención configurable. Los registros pueden almacenarse durante horas, días, o indefinidamente según cada caso de uso concreto. Kafka garantiza un tiempo de respuesta constante independientemente del número de registros almacenados.

Todas las lecturas y escrituras sobre una determinada partición son realizadas por un único servidor denominado bróker y que actúa como líder. Por cada líder pueden existir otros brokers actuando como seguidores que replican la partición de forma pasiva. Si el líder de una partición cae entonces uno de sus seguidores se promociona automáticamente a líder haciéndose cargo de gestionar la partición. Incluso es posible replicar un mismo cluster en varios datacenters, ya sea como mecanismo de backup, o con el propósito de tener la información distribuida por regiones geográficas distintas.

Grupos de Consumidores

Un concepto importante en Kafka son los denominados grupos de consumidores. Cada grupo de consumidores está ligado a un tópico, y cada consumidor dentro de un grupo está ligado a una partición. Si todos los consumidores de un tópico pertenecen al mismo grupo entonces los registros se distribuyen de forma que se garantiza que un mismo registro es consumido una única vez por un único consumidor del grupo. Si los consumidores pertenecen a varios grupos entonces un mismo registro se distribuye a un único consumidor de cada grupo.

Esto permite configurar Kafka siguiendo los patrones habituales utilizados en los sistemas de mensajería clásicos. Si se utiliza un único grupo entonces el sistema se comporta como una cola, donde un mensaje es consumido por único proceso. Si se utilizan múltiples grupos entonces el sistema se comporta como un sistema de publicación/suscripción, donde un mensaje es consumido por varios procesos. Tener un único consumidor por cada partición garantiza que los registros se consumen en el mismo orden que se publican, pero implica que Kafka sólo garantiza el orden a nivel de partición, no de tópico.

API

Kafka está escrito en Scala y Java, aunque existen clientes para prácticamente todos los lenguajes de programación de uso más habitual. El cliente para Java es desarrollado por el propio proyecto de forma oficial.

Kafka expone su funcionalidad a través de un protocolo sobre TCP mediante el que oferta servicios agrupados en base a su cometido. Cada grupo de servicios es un API distinto. Cinco de ellos constituyen el núcleo del sistema. Producer API permite publicar registros en los tópicos. Consumer API permite suscribirse a los tópicos y consumir registros. Streams API permite consumir registros de tópicos para volver a publicarlos transformados en otros tópicos. Connector API permite construir pasarelas entre Kafka y otros sistemas, como por ejemplo una base de datos relacional. Y AdminClient API que permite gestionar los distintos componentes que conforman una instancia o cluster de Kafka.

Para el caso de uso de Kafka como un sistema de procesamiento de streams, además del API, existe un proyecto llamado KSQL que se liberó este mismo año y permite escribir procesos que consuman streams con un lenguaje similar a SQL.

Resumiendo, Kafka es una plataforma que aúna distintas características que permiten utilizarlo como un sistema de almacenamiento, un sistema de mensajería, y un sistema de procesamiento de streams muy eficiente y altamente escalable.