Skip to content

java

Recorridos aleatorios en Java

La forma habitual de desordenar una lista en Java es utilizar el método Collections.shuffle. El inconveniente de este método es que modifica la lista original, un efecto lateral no deseable en el mundo actual donde predominan las colecciones inmutables. La alternativa más obvia es crear una nueva lista, a costa lógicamente de duplicar el consumo de memoria.

Un enfoque distinto es recorrer la lista de forma desordenada. La premisa de tal tipo de recorrido es que se retornen todos los elementos de la lista de manera aleatoria y que cada uno de ellos sólo se retorne una única vez. La condición es que la lista sea de tamaño fijo, preferentemente inmutable; lo contrario podría dar lugar a implementaciones con comportamientos indefinidos no predecibles. Y la restricción es que la operación se ejecute de forma secuencial, no paralela, ya que realizar particiones de la lista no permitiría acceder a todos los elementos de manera aleatoria.

Una implementación de tal tipo de recorrido implica generar un número aleatorio en cada iteración que indique el índice del elemento a retornar. Además de almacenar todos los índices generados para evitar duplicados. Para ello, para una lista de tamaño  n, se puede crear un array de n índices, inicializar dicho array secuencialmente con valores de 0 a n, en cada iteración i generar un número aleatorio r de i a n – 1, intercambiar en el array el valor en la posición i por el valor en la posición r, y retornar el elemento de la lista que se encuentra en la posición indicada por el valor almacenado en la posición i del array. Es decir, sustituir el elemento de la iteración en curso por otro de forma aleatoria y que aún no se haya retornado.

Partiendo de que en la actualidad se prefiere consumir colecciones con streams, en vez de con bucles for, la solución pasa por la implementación de un iterador, y de forma más concreta de un Spliterator<T>. Esta clase se introdujo con Java 8 para facilitar la construcción de iteradores complejos que pudieran consumir elementos tanto de forma secuencial como paralela, además de tener la capacidad de describirse a ellos mismos indicando cuales son sus características.

SplitIterator<T> es una interface que requiere implementar varios métodos, pero para la mayoría de los casos de uso, que no requieran soportar paralelismo, se puede simplemente extender de Spliterators.AbstractSpliterator<T>. Esta clase sólo requiere implementar el método boolean tryAdvance(Consumer<? super T> action). Un método que es equivalente a los clásicos métodos hasNext y next de los iteradores de Java, ya que a un mismo tiempo consume el elemento en curso e indica si quedan más elementos por consumir.

Una posible implementación de tal algoritmo podría ser la siguiente:

public static class RandomListSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

  private final SplittableRandom random = new SplittableRandom();
  private final List<T> elements;
  private final int size;
  private final int[] indexes;
  private int current = 0;

  public RandomListSpliterator(List<T> elements, int additionalCharacteristics) {
    super(elements.size(), additionalCharacteristics);

    this.elements = elements;
    this.size = elements.size();

    // Genera el array de índices inicial
    this.indexes = IntStream.range(0, this.size).toArray();
  }

  @Override
  public boolean tryAdvance(Consumer<? super T> consumer) {

    // Retorna inmediatamente si ya se ha recorrido la lista completa
    if (current == size) {
      return false;
    }

    // Obtiene el siguiente elemento de la lista de forma aleatoria
    final var element = next();

    // Consume el elemento obtenido
    consumer.accept(element);

    return true;
  }

  protected T next() {

    // Genera un índice sobre la lista de forma aleatoria
    final var rand = random.nextInt(current, size);

    // Intercambia el índice en la posición actual por el de la posición aleatoria
    final var index = indexes[rand];
    indexes[rand] = indexes[current];

    // Avanza a la siguiente posición del array para la siguiente iteración
    current ++;

    // Retorna el elemento que se encuentra en el índice de la posición aleatoria
    return elements.get(index);
  }
}

En el código, elements almacena la lista original y size su tamaño, indexes es el array de índices recorridos y current el número de la iteración en curso. El método tryAdvance consume el elemento en curso, y el método next realiza el proceso de generación del número aleatorio y actualización del array de índices. El código debería resultar bastante sencillo de entender con los comentarios.

Notar el uso de la clase SplittableRandom para la generación de números aleatorios. Esta clase se introdujo en Java 8, junto con SecureRandom cuando la aleatoriedad de la secuencia de números generados deba ser más estricta y cumplir criterios más estrictos como los requeridos en el mundo de la criptografía.

El iterador del código de ejemplo se puede probar creando una lista, una instancia del iterador, y procesando el conjunto como un stream con StreamSupport.stream:

var elements = IntStream.range(50, 60)
  .boxed()
  .collect(Collectors.toUnmodifiableList());

var characteristics = Spliterator.IMMUTABLE | 
  Spliterator.SIZED | 
  Spliterator.NONNULL;

var spliterator = new RandomListSpliterator<>(elements, characteristics);

StreamSupport.stream(spliterator, false)
  .forEach(System.out::println);

En cada ejecución la salida será distinta, devolviéndose los elementos de la lista original en un orden aleatorio cada vez.

La variable characteristics establece las características de la lista para facilitar a los clientes del iterador la toma de decisiones. IMMUTABLE indica que es inmutable, SIZED que tiene un tamaño prederminado, y NONNULL que no contiene nulos.  De la forma acostumbrada, es recomendable revisar la documentación para ver todas las opciones disponibles. En la práctica sería recomendable diseñar el iterador para un determinado caso de uso evitando tener que pasar dichos parámetros cada vez.

De cualquier forma, la implementación propuesta funciona, pero es muy básica. Por ejemplo, el array de índices se inicializa a costa de recorrerlo completamente, algo que puede evitarse tratando el cero como caso especial, ya que ese el valor que por defecto asigna Java a las variables de tipo int. Si un índice es cero entonces se puede interpretar como que no se ha visitado todavía y que su valor es la propia posición que ocupa, ya que ese es el valor con el que se debería haber inicializado originalmente:

public static class RandomListSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

  private final SplittableRandom random = new SplittableRandom();
  private final List<T> elements;
  private final int size;
  private final int[] indexes;
  private int current = 0;

  public RandomListSpliterator(List<T> elements, int additionalCharacteristics) {
    super(elements.size(), additionalCharacteristics);

    this.elements = elements;
    this.size = elements.size();

    // Crea el array de índices sin inicializarlo
    this.indexes = new int[this.size];
  }

  @Override
  public boolean tryAdvance(Consumer<? super T> consumer) {
    if (current == size) {
      return false;
    }

    final var element = next();

    consumer.accept(element);

    return true;
  }

  protected T next() {

    // Genera un índice aleatorio
    final var rand = random.nextInt(current, size);

    // Obtiene el índice sobre la posición actual y la aleatoria
    final var src = indexes[current ++];
    final var dst = indexes[rand];

    // Si el índice es cero entonces no se ha visitado y se usa la posición actual
    indexes[rand] = src == 0 ? current : src;

    // Si el índice es cero entonces no se ha visitado y se usa la posición aleatoria
    return elements.get(dst == 0 ? rand: dst - 1);
  }
}

De igual forma sería interesante pensar en reutilizar los arrays de índices cuando se procesen múltiples listas del mismo tamaño de forma secuencial, para evitar la reserva de memoria necesaria para crear dichos arrays por cada lista. Para ello se podría utilizar una variable que almacenase el valor del índice utilizado como caso especial, en vez utilizar el valor constante cero. Al guardar un índice en el array se tendría que almacenar sumándole el valor de dicha variable y compararlo también contra dicho valor. Así, en el primer uso del array el valor de la variable sería cero y el funcionamiento sería el mismo que el del código anterior. Pero al terminar el primer recorrido, y posteriores, se le sumaría a la variable el tamaño de la lista recorrida. En el segundo recorrido, y posteriores, los índices se almacenarían sumándoles dicho valor y se compararían contra dicho valor. Si el índice es menor que dicho valor entonces se consideraría que no se ha visitado. Por seguridad, cada cierto número de recorridos se podría inicializar el array y la variable a cero.

Por último, comentar que en el caso de realmente querer recorrer una lista de forma aleatoria en paralelo, una posible solución sería generar primero el array con los índices desordenados, y luego tratar en paralelo este array de forma particionada, en vez de particionar la lista como normalmente se suele hacer.

Benchmarks en Java con JMH

JMH (Java Microbenchmark Harness) es una herramienta para realizar benchmarks en Java. Se desarrolla como parte de OpenJDK y basa su funcionamiento en Maven. Su propósito es el de medir dos implementaciones distintas de un mismo diseño. Lo que en la práctica quiere decir que se utiliza para escribir dos métodos y ver cuál de ellos es más rápido. JMH tiene en cuenta muchos aspectos del funcionamiento interno de la máquina virtual de Java y se asegura de que el código probado se pruebe correctamente, algo no tan sencillo como a priori pudiera parecer.

Hacer un bucle que ejecute un millón de veces un método e imprimir al principio y final la hora para ver el tiempo transcurrido no sirve para nada. La máquina virtual de Java puede decidir optimizar el código y eliminarlo completamente de la ejecución si concluye que realmente no realiza ninguna operación efectiva. El tiempo medido en ese supuesto sería extremadamente pequeño, pero dentro del contexto de un programa real, donde el código sí que se ejecutase, podría ser muy elevado. Herramientas como JMH hacen todo lo posible para que las pruebas sean fiables.

Para realizar pruebas con JMH hay que generar un proyecto con Maven, preferentemente desde línea de comandos, aunque también se pueden generar desde un IDE. Lo que si se recomienda es utilizar la línea de comandos para ejecutarlos y no añadir ningún componente externo que pueda provocar efectos laterales durante la ejecución de las pruebas.

El comando básico de creación de un proyecto a partir del arquetipo de Maven para JMH es el siguiente:

mvn archetype:generate
  -DarchetypeGroupId=org.openjdk.jmh
  -DarchetypeArtifactId=jmh-java-benchmark-archetype
  -DinteractiveMode=false 
  -DgroupId=<GROUP_ID>
  -DartifactId=<ACTIFACT_ID>
  -Dversion=<VERSION>

Como resultado de la ejecución se crea un proyecto con dos ficheros dentro de un nuevo directorio. El primero de ellos es el clásico pom.xml con la configuración del proyecto Maven, y el segundo una clase de ejemplo que puede utilizarse como plantilla para crear benchmarks. El proyecto generado se puede compilar para crear un jar y ejecutarlo con los siguientes comandos dentro del nuevo directorio creado:

mvn clean install

java -jar target/benchmarks.jar

Si se está utilizando Java 9 o superior puede resultar en un error java.lang.NoClassDefFoundError: javax/annotation/Generated debido al nuevo sistema de módulos de Java. Este error se puede resolver configurando un parámetro en la máquina virtual de Java que ejecuta Maven, más concretamente indicando que cargue el módulo que contiene la clase que no se encuentra:

set MAVEN_OPTS=--add-modules java.xml.ws.annotation

Como resultado de la ejecución del benchmark se muestra por consola el progreso de la prueba, que consiste en varias ejecuciones de un proceso de precalentamiento de la máquina virtual, necesario para llevarla a un estado lo más predecible posible, varias ejecuciones de la clase de prueba, y finalmente las estadísticas resultantes.

Se pueden consultar todas las opciones disponibles para la ejecución del benchmark añadiendo el clásico parámetro -help en la línea de comandos con java -jar target/benchmarks.jar -help. Y es recomendable hacerlo las primeras veces para hacerse una idea de cómo se puede personalizar la ejecución.

Lógicamente la ejecución anterior con las opciones por defecto de una clase de ejemplo que se limita a probar un método vacío no aporta ningún valor. La idea es que se añada dentro del pom.xml la dependencia con el código que se quiera probar y se escriban clases que prueben dicho código. JMH proporciona en su documentación ejemplos de cómo escribir clases de prueba.

La escritura de clases de prueba para JMH se realiza mediante el uso de anotaciones de forma similar a como se realiza en otros frameworks.

  • @Benchmark es la anotación básica que debe aplicarse sobre los métodos públicos que se quieran probar.
  • @BenchmarkMode permite especificar el modo de realizar la prueba, pudiéndose elegir entre Throughput, que llama continuamente a los métodos probados y mide el rendimiento total, AverageTime, que llama continuamente a los métodos probados y mide el tiempo medio de ejecución, SampleTime, que llama continuamente a los métodos probados y mide el tiempo de ejecución tomando muestras aleatorias de la duración de los mismos, SingleShotTime, que llama una sola vez a los métodos probados y mide el tiempo de ejecución sin enmascarar la penalización por el precalentamiento, y All, que ejecuta todos los modos.
  • @OutputTimeUnit permite indicar la unidad de tiempo en que se quiere mostrar los resultados, pudiéndose elegir cualquier valor del enumerado java.util.concurrent.TimeUnit.
  • @Measurement y @Warmup permiten controlar el proceso de prueba y precalentamiento respectivamente mediante parámetros tales como el número de iteraciones a realizar o la duración de las mismas. Otras anotaciones como @Fork, @Threads, @CompilerControl y @Param controlan detalles de más bajo nivel referentes a la creación de hilos de ejecución y la compilación de las clases de prueba.
  • @State se utiliza para crear clases que se puedan pasar como argumentos a los métodos marcados con @Benchmark. En la anotación se debe indicar el ámbito de los objetos creados, pudiéndose elegir entre Thread, para que todas las instancias sean distintas, Benchmark, para que todas las instancias de una misma clase se compartan entre todos los threads, o Group, para que todas las instancias de una misma clase se compartan entre los threads de un grupo. Los grupos se definen con las anotaciones @Group y @GroupThread que permiten probar varios métodos a la vez en una misma iteración.
  • @Setup y @Teardown permiten ejecutar procesos de inicialización y finalización de una clase anotada con @State. De hecho, sólo se pueden utilizar sobre métodos de clases anotadas con @State. Se invocan en función de un argumento de las propias anotaciones, pudiéndose elegir entre Trial, para que se invoquen por cada prueba, Iteration, para que se invoquen en cada iteración de las pruebas, e Invocation, para que se invoquen en cada ejecución del método, aunque este último no se recomienda sin leer la documentación y entender sus implicaciones.

Muchas de las anotaciones son heredables, por lo que es sencillo escribir clases base de las que heredar el comportamiento y crear suites de pruebas. O clases concretas para realizar pruebas más específicas, como en el siguiente ejemplo:

@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations=5, time=50, timeUnit=TimeUnit.MILLISECONDS)
@Measurement(iterations=10, time=100, timeUnit=TimeUnit.MILLISECONDS)
@Fork(value=1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class RandomBenchmark {

  @Benchmark
  public double testMathRandom() {
    return Math.random();
  }

  @Benchmark
  public double testRandom(StatedRandom stated) {
    return stated.getRandom().nextDouble();
  }

  @Benchmark
  public double testThreadLocalRandom() {
    return ThreadLocalRandom.current().nextDouble();
  }

  @State(Scope.Benchmark)
  public static class StatedRandom {

    private Random random;

    public Random getRandom() {
      return random;
    }

    @Setup public void setup() {
      random = new Random();
    }
  } 
}

Como se observa, los métodos de prueba no deben contener bucles, sólo la invocación a la funcionalidad que se quiera probar. JMH ya realiza las iteraciones pertinentes. No obstante, si fuera realmente necesario por alguna razón técnica hacer un bucle que invoque a la funcionalidad que se quiere probar, se puede utilizar la anotación @OperationsPerInvocation(n) sobre el método que contiene el bucle para indicar que dicho método realiza n iteraciones.

Otro detalle a tener en cuenta es que resulta conveniente retornar algún valor desde los métodos de prueba, para evitar que la máquina virtual considere el código como muerto y lo optimice eliminando su ejecución. De igual forma, si el valor retornado por el método de prueba no depende del estado de ningún objeto y el resultado es predecible, la máquina virtual puede optimizarlo reemplazándolo por un valor constante, o eliminándolo completamente si concluye que no hay ningún proceso posterior que lo utilice. Para evitar estos y otros problemas se debe utilizar la clase BlackHole de JMH, que consume todo tipo de objetos y hace todo lo posible por garantizar que la prueba sea lo más fiable posible.

@Benchmark public void testMathRandomBh(Blackhole bh) {
  bh.consume(Math.random());
}

Para terminar, comentar además otras dos características interesantes que ofrece JMH. La primera de ellas es un API para Java que permite crear programas que ejecuten el propio JMH desde un main. Y la segunda una serie de profiles, para pruebas más sofisticadas, que permiten controlar aspectos tales como el tiempo consumido por el compilador JIT (Just In Time) o el Garbage Collector, por citar sólo algunas de sus posibilidades.

Reactive Streams en Java (y 3)

Implementar un stream que cumpla todos los requerimientos exigidos por la especificación de Reactive Streams requiere bastante atención a los detalles. Puede llegar a ser complicado verificar que todas y cada una de las reglas de la especificación se cumplen. Afortunadamente, el mismo grupo de trabajo que desarrolla la especificación también ofrece Reactive Streams Technology Compatibility Kit (TCK), una herramienta que prueba implementaciones de reactive streams y verifica si cumplen las reglas.

TCK prueba prácticamente todas las reglas, pero no todas, ya que hay unas pocas reglas que no pueden probarse de forma automatizada debido a la naturaleza de las mismas, como por ejemplo comprobar que un productor realmente genera infinitos elementos. En todo caso, TCK prueba todas las reglas importantes y es de gran ayuda para los desarrolladores.

TCK es una librería de código abierto desarrollada en Java disponible en Maven. No obstante, hay que tener en cuenta que existen dos versiones de la librería, una anterior a Java 9 y otra posterior. La anterior permite probar streams que implementen las interfaces propuestas originalmente por el grupo de trabajo que desarrolla la especificación. La posterior permite probar las nuevas interfaces del paquete java.util.concurrent.Flow ofrecidas de forma nativa por Java 9 y lógicamente es la que se recomienda utilizar hoy en día.

Para incluir TCK en un proyecto basta con añadir la dependencia de Maven correspondiente en el fichero pom.xml de la forma acostumbrada:

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck-flow</artifactId>
  <version>1.0.2</version>
  <scope>test</scope>
</dependency>

TCK ofrece cuatro clases base de las que se puede extender para escribir pruebas basadas en TestNG. La clase PublisherVerification<T> se debe utilizar para probar publicadores. Las clases SubscriberWhiteboxVerification<T> y SubscriberBlackboxVerification<T> se deben utilizar para probar suscriptores, siendo recomendable utilizar la primera, aunque implique más esfuerzo, ya que es capaz de probar algunos casos que la segunda no puede. Y por último, la clase IdentityProcessorVerification<T> que debe utilizarse para probar procesadores. La documentación es detallada y cubre bastantes aspectos del uso de la librería.

Como ejemplo, partamos de la siguiente implementación de un publicador que no hace absolutamente nada:

public class TestPublisher implements Publisher<Integer> {

  @Override
  public void subscribe(Subscriber<? super Integer> subscriber) {
  }
}

Para probar el publicador se debe escribir una clase que extienda de PublisherVerification<T>. En el constructor se debe inicializar el entorno de ejecución creando una instancia de TestEnviroment, que permite especificar algunos parámetros, como por ejemplo timeouts o el nivel de detalle del log. Y se debe implementar dos métodos, createPublisher, para crear una instancia del publicador que se quiere probar, y createFailedPublisher, para crear una instancia del publicador que se quiere probar en condiciones específicas de error, aunque esto último se puede ignorar retornando null  desde el método.

@Test
public class TestPublisherTest extends FlowPublisherVerification<Integer> {

  public TestPublisherTest() {
    super(new TestEnvironment());
  }

  @Override
  public Publisher<Integer> createFlowPublisher(long elements) {
    return new TestPublisher();
  }

  @Override
  public Publisher<Integer> createFailedFlowPublisher() {
    return null;
  }
}

La forma de ejecutar las pruebas con TestNG depende del entorno en el que se esté trabajando. Desde Eclipse basta con instalar un plugin desde el Marketplace, que añade opciones que permiten ejecutar directamente las clases de prueba. No obstante, TCK depende de una versión muy antigua de TestNG, por lo que es necesario excluirla y añadir una versión más moderna para que el plugin funcione:

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck-flow</artifactId>
  <version>1.0.2</version>
  <scope>test</scope>
  <exclusions>
    <exclusion>
      <groupId>org.testng</groupId>
      <artifactId>testng</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.testng</groupId>
  <artifactId>testng</artifactId>
  <version>6.14.3</version>
</dependency>

En buena lógica, el resultado de la ejecución de la prueba sobre el productor de ejemplo, que no implementa nada, retorna una gran cantidad de errores:

FAILED: required_createPublisher3MustProduceAStreamOfExactly3Elements
FAILED: required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements
FAILED: required_spec102_maySignalLessThanRequestedAndTerminateSubscription
FAILED: required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates
...
Tests run: 38, Failures: 20, Skips: 16

Los errores devueltos por TCK son de la forma TYPE_spec###_DESC. Donde TYPE puede ser required, optional, stochastic o untested, e indica si la regla es obligatoria, opcional, no verificable, o no implementada. ### es el número de regla, siendo el primer número el apartado de la especificación y los dos últimos el número de la regla dentro del apartado. Y DESC es una breve descripción de la regla.

La implementación original de nuestro productor de ejemplo se puede mejorar implementando una suscripción y retornándola cuando un suscriptor lo solicite, aunque sin implementar aún nada realmente:

public class TestPublisher implements Publisher<Integer> {

  public static class TestSubscription implements Subscription {

    @Override
    public void request(long n) {
    }

    @Override
    public void cancel() {
    }
  }

  @Override
  public void subscribe(Subscriber<? super Integer> subscriber) {
    Subscription subscription = new TestSubscription();

    subscriber.onSubscribe(subscription);
  }
}

Si se vuelve a ejecutar la prueba, se observa como el número de errores se reduce:

Tests run: 38, Failures: 14, Skips: 14

Si se estudia la lista de pruebas ejecutadas con éxito se puede observar que algunas pasan de casualidad debido a algún efecto lateral. Por ejemplo, la prueba required_spec109_subscribeThrowNPEOnNullSubscriber se ejecuta con éxito porque el método utiliza directamente el suscriptor recibido, sin comprobar si es nulo, y eso hace que se eleve un NullPointerException (NPE) tal y como exige la regla. La solución obvia es comprobar que el parámetro no es nulo antes de utilizarlo:

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
  Objects.requireNonNull(subscriber);
…

El código se puede seguir evolucionando para hacer que implemente más reglas de la especificación, como por ejemplo la relativa a la prueba optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage que exige que se debe señalizar un error con una instancia de IllegalArgumentException si se solicitan cero o menos elementos:

public static class TestSubscription implements Subscription {

  private final Subscriber<? super Integer> subscriber;
  
  public TestSubscription(Subscriber<? super Integer> subscriber) {
    this.subscriber = subscriber;
  }

  @Override
  public void request(long n) {
    if (n <= 0L) {
      subscriber.onError(new IllegalArgumentException("3.9");
    …

Llegado este punto ya debería quedar la clara la utilidad de TCK, que no sólo se limita a facilitar las pruebas, sino que ayuda a delimitar la funcionalidad pendiente de implementar para cumplir con los requerimientos de la especificación de Reactive Streams. El propio repositorio de código de TCK incluye unas cuantas clases de ejemplo donde se puede observar la complejidad necesaria para cumplir con algunas de las reglas.

En líneas generales, implementar un reactive stream no se considera una tarea abordable sin un estudio previo de la especificación y el conocimiento de aspectos concretos del funcionamiento de Java, como por ejemplo la sincronización de procesos concurrentes. El propio Java proporciona la clase SubmissionPublisher<T> para facilitar la creación de publicadores asíncronos, y se recomienda su utilización si cubre el caso de uso que se debe implementar en vez de realizar un desarrollo partiendo completamente de cero.

Reactive Streams en Java (2)

Reactive Streams es una iniciativa para definir los requerimientos y las interfaces que deben cumplir los reactive streams, así como para proporcionar las herramientas necesarias para comprobar que las implementaciones cumplan todos los requerimientos. Posiblemente sea el esfuerzo más relevante llevado a cabo en la materia, y de hecho, Java 9 incorporó al JDK de forma nativa las interfaces propuestas por esta iniciativa.

Los requerimientos detallados que deben cumplir cada uno de los distintos componentes que intervienen en un reactive stream están definidos en una especificación publicada por uno de los grupos de trabajo de Reactive Streams. Es interesante conocerlos, aunque sólo sea a través de una lectura rápida, para entender mejor el comportamiento esperado para este tipo de streams.

1. Publisher

1. Un publicador DEBE enviar a un suscriptor un número total de elementos igual o menor que el número total de elementos solicitados por dicho suscriptor.

2. Un publicador PODRÍA enviar menos elementos que los solicitados por un suscriptor, por ejemplo cuando ya no sea capaz de producir más elementos a partir de su fuente origen de información.

3. Un publicador DEBE invocar los métodos onSubscribe, onNext, onError y onComplete de un suscriptor de forma thread-safe y utilizando algún mecanismo de sincronización en caso de utilizar múltiples threads.

4. Un publicador DEBE comunicar sus errores invocando el método onError de sus suscriptores.

5. Un publicador DEBE comunicar que ya no puede generar más elementos invocando el método onComplete de sus suscriptores.

6. Un publicador que invoque los métodos onError u onComplete de un suscriptor DEBE considerar cancelada dicha suscripción.

7. Un publicador no DEBERÍA invocar ningún otro método de un suscriptor después de haber invocado sus métodos onError u onComplete.

8. Un publicador DEBE dejar de invocar eventualmente métodos de un suscriptor si su suscripción ha sido cancelada. No obstante, debido a la naturaleza asíncrona del proceso, y la demora entre la petición de cancelación y su cancelación efectiva, podría llegar a invocar alguno de sus métodos.

9. Un publicador DEBE, dentro la implementación de su método subscribe, invocar el método onSubscribe del suscriptor pasado como parámetro antes de invocar cualquier otro de método de dicho suscriptor. El método debe terminar normalmente, excepto si el suscriptor pasado como parámetro es nulo, en cuyo caso debe elevar una excepción de tipo NullPointerException. Cualquier otro tipo de error debe realizarse invocando el método onError del suscriptor.

10. Un publicador NO DEBE permitir la suscripción de un mismo suscriptor más de una vez.

11. Un publicador PUEDE decidir el número de distintos suscriptores que soporta a un mismo tiempo y si la implementación de las suscripciones la realiza de forma unicast (transmisión de uno a uno) o multicast (transmisión de uno a muchos).

2. Subscriber

1. Un suscriptor DEBE llamar al método request de Subscription para empezar a recibir elementos generados por la suscripción. Es decir, es responsabilidad del suscriptor indicar al publicador cuando quiere empezar a recibir y cuantos elementos está preparado para recibir.

2. Un suscriptor DEBERÍA procesar un elemento de forma asíncrona si estima que su tiempo de respuesta puede afectar la calidad de servicio del publicador.

3. Un suscriptor NO DEBE llamar a ningún método de Subscription ni Publisher dentro de sus métodos onComplete y onError para evitar ciclos y condiciones de carrera.

4. Un suscriptor DEBE considerar cancelada una suscripción cuando el publicador invoca su método onComplete u onError.

5. Un suscriptor DEBE llamar al método cancel de Subscription cuando un publicador invoca su método onSubscribe teniendo el suscriptor ya una suscripción activa.

6. Un suscriptor DEBE llamar al método cancel de Subscription  cuando la suscripción ya no sea necesaria con el objetivo de permitir liberar los recursos asociados a dicha suscripción.

7. Un suscriptor DEBE asegurar que todas las llamadas a los métodos de Subscription se realizan desde un mismo thread, o utilizando algún método de sincronización en caso de que la suscripción se esté usando concurrentemente desde distintos threads.

8. Un suscriptor DEBE estar preparado para recibir elementos a través de su método onNext, aún después de haber llamado al método cancel de Subscription, debido a la naturaleza asíncrona del proceso y la demora que se produce entre que el suscriptor solicita la cancelación y el publicador la cancela realmente.

9. Un suscriptor DEBE estar preparado para recibir la finalización de la suscripción a través de su método onComplete, incluso antes de haber llamado al método request de Subscription, debido a que el publicador tiene potestad para dar por terminada una suscripción.

10. Un suscriptor DEBE estar preparado para recibir la finalización de la suscripción a través de su método onError, incluso antes de haber llamado al método request de Subscription, debido a errores en el publicador.

11. Un suscriptor DEBE asegurar que el procesamiento asíncrono de sus métodos onSubscribe, onNext, onComplete y onError se realiza de manera thread-safe.

12. Un mismo suscriptor DEBE suscribirse a un mismo publicador una única vez.

13. Un suscriptor DEBE terminar las invocaciones a sus métodos onSubscribe, onNext, onComplete y onError sin elevar ninguna excepción. Excepto cuando algún parámetro recibido sea nulo, en cuyo caso deberá elevar una excepción de tipo NullPointerException. Cualquier otro error debe resolverse cancelando la suscripción. El publicador no puede tratar los errores del suscriptor, por lo que dará por cancelada la subscripción en caso de producirse alguno.

3. Subscription

1. Una suscripción es el único componente a través del que se establece la comunicación entre un suscriptor y un publicador. Un objeto Subscription concreto pertenece de forma exclusiva a un Publisher y un Subscriber concretos. Sólo el publicador DEBE llamar a a los métodos request y cancel  de Subscription.

2. Una suscripción DEBE permitir que se llame a su método request desde los métodos onSuscribe y onNext de un suscriptor de manera síncrona. Es decir, la implementación del método request debe ser reentrante.

3. Una suscripción DEBE establecer un número máximo de llamadas recursivas entre los métodos request de Subscriber y onNext de Subscription. La recomendación a este respecto es limitar a uno el número de llamadas recursivas evitando la secuencia request -> onNext -> request -> onNext -> …

4. Una suscripción DEBERÍA implementar su método request de forma que retorne lo antes posible para no afectar la calidad de servicio del suscriptor que lo invoque.

5. Una suscripción DEBE implementar su método cancel de forma idempotente, thread-safe y retornar lo antes posible para no afectar la calidad del servicio del suscriptor que lo invoque.

6. Una suscripción NO DEBE realizar ninguna acción cuando se llame a su método request una vez que la suscripción haya sido cancelada.

7. Una suscripción NO DEBE realizar ninguna acción cuando se llame a su método cancel una vez que la suscripción haya sido cancelada.

8. Una suscripción DEBE almacenar la suma de todas las cantidades de elementos solicitadas por todas las llamadas realizadas por el suscriptor a su método request.

9. Una suscripción DEBE invocar al método onError del suscriptor con una instancia de una excepción IllegalArgumentException si se solicita una cantidad de elementos igual o menor que cero en alguna llamada realizada por el suscriptor a su método request.

10. Un suscripción PODRÍA implementar en su método request invocaciones al método onNext del suscriptor de manera síncrona. Siempre y cuando la suscripción no haya sido ya cancelada.

11. Un suscripción PODRÍA implementar en su método request invocaciones al método onComplete del suscriptor de manera síncrona. Siempre y cuando la suscripción no haya sido ya cancelada.

12. Una suscripción DEBE requerir al Publisher que deje de invocar los métodos del Subscriber cuando el suscriptor llame a su método cancel . Siempre y cuando la suscripción no haya sido ya cancelada.

13. Una suscripción DEBE requerir del Publisher que libere cualquier referencia al Subscriber  cuando el suscriptor llame a su método cancel. Siempre y cuando la suscripción no haya sido cancelada.

14. Una suscripción PODRÍA causar que el Publisher pase a un estado de finalizado cuando el suscriptor llame a su método cancel. Siempre y cuando la suscripción no haya sido ya cancelada.

15. Una suscripción NO DEBE elevar excepciones en la implementación de su método cancel .

16. Una suscripción NO DEBE elevar excepciones en la implementación de su método request.

17. Una suscripción DEBE permitir peticiones de una cantidad de elementos potencialmente infinita. Una petición de 2^63-1 (java.lang.Long.MAX_VALUE) o más elementos se considera como una petición de una cantidad infinita de elementos.

4. Processor

1. Un procesador DEBE cumplir tanto los requerimientos establecidos para los publicadores como para los suscriptores.

2. Un procesador PODRÍA elegir recuperarse de una invocación a su método onError por parte de un publicador, pero en ese caso DEBE considerar la suscripción cancelada, en caso contrario DEBE propagar el error inmediatamente invocando el método onError de sus suscriptores.

Reactive Streams en Java (1)

Java 9 introdujo la clase java.util.concurrent.Flow en un intento de estandarizar de forma nativa la implementación de reactive streams. Una propuesta de diseño que propugna la creación de streams asíncronos y no bloqueantes, prestando especial atención a que la fuente origen de la información, potencialmente infinita, no sature al destinatario.

Los reactive streams asemejan al patrón Observer, con un modelo de publicadores y suscriptores. Los publicadores generan una secuencia de elementos que envían a los suscriptores, pero teniendo además estos últimos la capacidad de limitar el número de elementos que quieren recibir.

Java ha añadido unas interfaces con el mínimo número de métodos necesarios para implementar reactive streams, pero no una vasta colección de implementaciones de dichas interfaces. Lo que es importante, es que con la creación de todas estas interfaces dentro del propio JDK se unifican por parte de Java las distintas propuestas existentes actualmente para la implementación de reactive streams. Es de esperar que en un futuro todas las librerías de terceros utilicen estas interfaces en vez de crear las suyas propias.

Flow.Publisher<T>

La interface funcional Flow.Publisher<T> representa los publicadores.

@FunctionalInterface
public static interface Publisher<T> {

  public void subscribe(Subscriber<? super T> subscriber);
}

El método subscribe permite que los suscriptores se suscriban a un publicador. El método recibe una instancia de Subscriber a través de la que el publicador se comunicará con el suscriptor. El publicador debe elevar una excepción de tipo NullPointerException si la instancia de Subscriber pasada como parámetro es nula.

Flow.Subscriber<T>

La interface Flow.Subscriber<T> representa los suscriptores. Es utilizada por los publicadores para notificar eventos a los suscriptores.

public static interface Subscriber<T> {

  public void onSubscribe(Subscription subscription);

  public void onNext(T item);

  public void onError(Throwable throwable);

  public void onComplete();
}

El método onSubscribe es llamado por el publicador después de que el suscriptor llame al método suscribe del publicador y este último acepte la suscripción. Recibe una instancia de Subscription a través de la que el suscriptor podrá controlar la suscripción.

El método onNext es llamado por el publicador para pasar al suscriptor los elementos generados por la suscripción.

El método onComplete es llamado por el publicador cuando la suscripción termina, ya sea porque se ha consumido el número de elementos solicitado por el suscriptor, o el publicador ha consumido totalmente los elementos de su fuente origen de la información.

El método onError es llamado por el publicador cuando se produce un error. Después de llamar a este método la suscripción se considera terminada y ningún otro método será invocado. Notar que no se lanza una excepción en caso de error, sino que el publicador pasa una instancia de una excepción al suscriptor. Por ejemplo, este método es invocado con una excepción de tipo IllegalStateException cuando un suscriptor intenta suscribirse y ya está suscrito, o se produce un error de cualquier otro tipo durante el proceso de suscripción.

De forma general el comportamiento es indefinido si se produce un error y se eleva una excepción durante la invocación de cualquiera de estos métodos por parte del suscriptor.

Flow.Subscription

La interface Flow.Subscription representa las suscripciones. Es utilizada por los suscriptores para controlar la suscripción.

public static interface Subscription {

  public void request(long n);

  public void cancel();
}

El método request es llamado por el suscriptor para notificar al publicador que está en disposición de recibir elementos de la suscripción. Se puede llamar tantas veces como se quiera. El parámetro indica el número de elementos que está en disposición de recibir además de los ya solicitados. Si se solicita cero o menos elementos el publicador llamará al método onError del suscriptor con una instancia de la excepción IllegalArgumentException. Si se solicita Long.MAX_VALUE el publicador interpretará que el suscriptor quiere recibir todos los elementos generados por la suscripción sin ningún tipo de restricción en cuanto a su volumen, potencialmente infinito.

El método cancel es llamado por el suscriptor para solicitar al publicador que cancele la suscripción. Después de llamar este método, ni onComplete ni onError serán invocados por el publicador, pero onNext si puede llegar a ser invocado por el publicador debido a la naturaleza asíncrona de todo el proceso.

Flow.Processor<T, R>

La interface Flow.Processor<T, R> representa un componente que se comporta tanto como un publicador como un suscriptor.

public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

El propósito principal de esta interface es la de ser implementada por componentes intermedios que se dediquen a realizar algún tipo de transformación sobre los datos recibidos de un publicador y publicarlos a su vez transformados para otros componentes.

Para terminar, es interesante notar el cuidado diseño de todas estas interfaces. Todas ellas establecen mecanismos de comunicación entre dos componentes en una única dirección. Todos los métodos tienen void como tipo retornado y admiten a lo sumo un parámetro. Y los objetos se intercambian entre los distintos componentes mediante invocaciones a métodos.