Skip to content

Reactive Streams en Java (2)

Reactive Streams es una iniciativa para definir los requerimientos y las interfaces que deben cumplir los reactive streams, así como para proporcionar las herramientas necesarias para comprobar que las implementaciones cumplan todos los requerimientos. Posiblemente sea el esfuerzo más relevante llevado a cabo en la materia, y de hecho, Java 9 incorporó al JDK de forma nativa las interfaces propuestas por esta iniciativa.

Los requerimientos detallados que deben cumplir cada uno de los distintos componentes que intervienen en un reactive stream están definidos en una especificación publicada por uno de los grupos de trabajo de Reactive Streams. Es interesante conocerlos, aunque sólo sea a través de una lectura rápida, para entender mejor el comportamiento esperado para este tipo de streams.

1. Publisher

1. Un publicador DEBE enviar a un suscriptor un número total de elementos igual o menor que el número total de elementos solicitados por dicho suscriptor.

2. Un publicador PODRÍA enviar menos elementos que los solicitados por un suscriptor, por ejemplo cuando ya no sea capaz de producir más elementos a partir de su fuente origen de información.

3. Un publicador DEBE invocar los métodos onSubscribe, onNext, onError y onComplete de un suscriptor de forma thread-safe y utilizando algún mecanismo de sincronización en caso de utilizar múltiples threads.

4. Un publicador DEBE comunicar sus errores invocando el método onError de sus suscriptores.

5. Un publicador DEBE comunicar que ya no puede generar más elementos invocando el método onComplete de sus suscriptores.

6. Un publicador que invoque los métodos onError u onComplete de un suscriptor DEBE considerar cancelada dicha suscripción.

7. Un publicador no DEBERÍA invocar ningún otro método de un suscriptor después de haber invocado sus métodos onError u onComplete.

8. Un publicador DEBE dejar de invocar eventualmente métodos de un suscriptor si su suscripción ha sido cancelada. No obstante, debido a la naturaleza asíncrona del proceso, y la demora entre la petición de cancelación y su cancelación efectiva, podría llegar a invocar alguno de sus métodos.

9. Un publicador DEBE, dentro la implementación de su método subscribe, invocar el método onSubscribe del suscriptor pasado como parámetro antes de invocar cualquier otro de método de dicho suscriptor. El método debe terminar normalmente, excepto si el suscriptor pasado como parámetro es nulo, en cuyo caso debe elevar una excepción de tipo NullPointerException. Cualquier otro tipo de error debe realizarse invocando el método onError del suscriptor.

10. Un publicador NO DEBE permitir la suscripción de un mismo suscriptor más de una vez.

11. Un publicador PUEDE decidir el número de distintos suscriptores que soporta a un mismo tiempo y si la implementación de las suscripciones la realiza de forma unicast (transmisión de uno a uno) o multicast (transmisión de uno a muchos).

2. Subscriber

1. Un suscriptor DEBE llamar al método request de Subscription para empezar a recibir elementos generados por la suscripción. Es decir, es responsabilidad del suscriptor indicar al publicador cuando quiere empezar a recibir y cuantos elementos está preparado para recibir.

2. Un suscriptor DEBERÍA procesar un elemento de forma asíncrona si estima que su tiempo de respuesta puede afectar la calidad de servicio del publicador.

3. Un suscriptor NO DEBE llamar a ningún método de Subscription ni Publisher dentro de sus métodos onComplete y onError para evitar ciclos y condiciones de carrera.

4. Un suscriptor DEBE considerar cancelada una suscripción cuando el publicador invoca su método onComplete u onError.

5. Un suscriptor DEBE llamar al método cancel de Subscription cuando un publicador invoca su método onSubscribe teniendo el suscriptor ya una suscripción activa.

6. Un suscriptor DEBE llamar al método cancel de Subscription  cuando la suscripción ya no sea necesaria con el objetivo de permitir liberar los recursos asociados a dicha suscripción.

7. Un suscriptor DEBE asegurar que todas las llamadas a los métodos de Subscription se realizan desde un mismo thread, o utilizando algún método de sincronización en caso de que la suscripción se esté usando concurrentemente desde distintos threads.

8. Un suscriptor DEBE estar preparado para recibir elementos a través de su método onNext, aún después de haber llamado al método cancel de Subscription, debido a la naturaleza asíncrona del proceso y la demora que se produce entre que el suscriptor solicita la cancelación y el publicador la cancela realmente.

9. Un suscriptor DEBE estar preparado para recibir la finalización de la suscripción a través de su método onComplete, incluso antes de haber llamado al método request de Subscription, debido a que el publicador tiene potestad para dar por terminada una suscripción.

10. Un suscriptor DEBE estar preparado para recibir la finalización de la suscripción a través de su método onError, incluso antes de haber llamado al método request de Subscription, debido a errores en el publicador.

11. Un suscriptor DEBE asegurar que el procesamiento asíncrono de sus métodos onSubscribe, onNext, onComplete y onError se realiza de manera thread-safe.

12. Un mismo suscriptor DEBE suscribirse a un mismo publicador una única vez.

13. Un suscriptor DEBE terminar las invocaciones a sus métodos onSubscribe, onNext, onComplete y onError sin elevar ninguna excepción. Excepto cuando algún parámetro recibido sea nulo, en cuyo caso deberá elevar una excepción de tipo NullPointerException. Cualquier otro error debe resolverse cancelando la suscripción. El publicador no puede tratar los errores del suscriptor, por lo que dará por cancelada la subscripción en caso de producirse alguno.

3. Subscription

1. Una suscripción es el único componente a través del que se establece la comunicación entre un suscriptor y un publicador. Un objeto Subscription concreto pertenece de forma exclusiva a un Publisher y un Subscriber concretos. Sólo el publicador DEBE llamar a a los métodos request y cancel  de Subscription.

2. Una suscripción DEBE permitir que se llame a su método request desde los métodos onSuscribe y onNext de un suscriptor de manera síncrona. Es decir, la implementación del método request debe ser reentrante.

3. Una suscripción DEBE establecer un número máximo de llamadas recursivas entre los métodos request de Subscriber y onNext de Subscription. La recomendación a este respecto es limitar a uno el número de llamadas recursivas evitando la secuencia request -> onNext -> request -> onNext -> ...

4. Una suscripción DEBERÍA implementar su método request de forma que retorne lo antes posible para no afectar la calidad de servicio del suscriptor que lo invoque.

5. Una suscripción DEBE implementar su método cancel de forma idempotente, thread-safe y retornar lo antes posible para no afectar la calidad del servicio del suscriptor que lo invoque.

6. Una suscripción NO DEBE realizar ninguna acción cuando se llame a su método request una vez que la suscripción haya sido cancelada.

7. Una suscripción NO DEBE realizar ninguna acción cuando se llame a su método cancel una vez que la suscripción haya sido cancelada.

8. Una suscripción DEBE almacenar la suma de todas las cantidades de elementos solicitadas por todas las llamadas realizadas por el suscriptor a su método request.

9. Una suscripción DEBE invocar al método onError del suscriptor con una instancia de una excepción IllegalArgumentException si se solicita una cantidad de elementos igual o menor que cero en alguna llamada realizada por el suscriptor a su método request.

10. Un suscripción PODRÍA implementar en su método request invocaciones al método onNext del suscriptor de manera síncrona. Siempre y cuando la suscripción no haya sido ya cancelada.

11. Un suscripción PODRÍA implementar en su método request invocaciones al método onComplete del suscriptor de manera síncrona. Siempre y cuando la suscripción no haya sido ya cancelada.

12. Una suscripción DEBE requerir al Publisher que deje de invocar los métodos del Subscriber cuando el suscriptor llame a su método cancel . Siempre y cuando la suscripción no haya sido ya cancelada.

13. Una suscripción DEBE requerir del Publisher que libere cualquier referencia al Subscriber  cuando el suscriptor llame a su método cancel. Siempre y cuando la suscripción no haya sido cancelada.

14. Una suscripción PODRÍA causar que el Publisher pase a un estado de finalizado cuando el suscriptor llame a su método cancel. Siempre y cuando la suscripción no haya sido ya cancelada.

15. Una suscripción NO DEBE elevar excepciones en la implementación de su método cancel .

16. Una suscripción NO DEBE elevar excepciones en la implementación de su método request.

17. Una suscripción DEBE permitir peticiones de una cantidad de elementos potencialmente infinita. Una petición de 2^63-1 ( java.lang.Long.MAX_VALUE) o más elementos se considera como una petición de una cantidad infinita de elementos.

4. Processor

1. Un procesador DEBE cumplir tanto los requerimientos establecidos para los publicadores como para los suscriptores.

2. Un procesador PODRÍA elegir recuperarse de una invocación a su método onError por parte de un publicador, pero en ese caso DEBE considerar la suscripción cancelada, en caso contrario DEBE propagar el error inmediatamente invocando el método onError de sus suscriptores.

Reactive Streams en Java (1)

Java 9 introdujo la clase java.util.concurrent.Flow en un intento de estandarizar de forma nativa la implementación de reactive streams. Una propuesta de diseño que propugna la creación de streams asíncronos y no bloqueantes, prestando especial atención a que la fuente origen de la información, potencialmente infinita, no sature al destinatario.

Los reactive streams asemejan al patrón Observer, con un modelo de publicadores y suscriptores. Los publicadores generan una secuencia de elementos que envían a los suscriptores, pero teniendo además estos últimos la capacidad de limitar el número de elementos que quieren recibir.

Java ha añadido unas interfaces con el mínimo número de métodos necesarios para implementar reactive streams, pero no una vasta colección de implementaciones de dichas interfaces. Lo que es importante, es que con la creación de todas estas interfaces dentro del propio JDK se unifican por parte de Java las distintas propuestas existentes actualmente para la implementación de reactive streams. Es de esperar que en un futuro todas las librerías de terceros utilicen estas interfaces en vez de crear las suyas propias.

Flow.Publisher<T>

La interface funcional Flow.Publisher<T> representa los publicadores.

El método subscribe permite que los suscriptores se suscriban a un publicador. El método recibe una instancia de Subscriber a través de la que el publicador se comunicará con el suscriptor. El publicador debe elevar una excepción de tipo NullPointerException si la instancia de Subscriber pasada como parámetro es nula.

Flow.Subscriber<T>

La interface Flow.Subscriber<T> representa los suscriptores. Es utilizada por los publicadores para notificar eventos a los suscriptores.

El método onSubscribe es llamado por el publicador después de que el suscriptor llame al método suscribe del publicador y este último acepte la suscripción. Recibe una instancia de Subscription a través de la que el suscriptor podrá controlar la suscripción.

El método onNext es llamado por el publicador para pasar al suscriptor los elementos generados por la suscripción.

El método onComplete es llamado por el publicador cuando la suscripción termina, ya sea porque se ha consumido el número de elementos solicitado por el suscriptor, o el publicador ha consumido totalmente los elementos de su fuente origen de la información.

El método onError es llamado por el publicador cuando se produce un error. Después de llamar a este método la suscripción se considera terminada y ningún otro método será invocado. Notar que no se lanza una excepción en caso de error, sino que el publicador pasa una instancia de una excepción al suscriptor. Por ejemplo, este método es invocado con una excepción de tipo IllegalStateException cuando un suscriptor intenta suscribirse y ya está suscrito, o se produce un error de cualquier otro tipo durante el proceso de suscripción.

De forma general el comportamiento es indefinido si se produce un error y se eleva una excepción durante la invocación de cualquiera de estos métodos por parte del suscriptor.

Flow.Subscription

La interface Flow.Subscription representa las suscripciones. Es utilizada por los suscriptores para controlar la suscripción.

El método request es llamado por el suscriptor para notificar al publicador que está en disposición de recibir elementos de la suscripción. Se puede llamar tantas veces como se quiera. El parámetro indica el número de elementos que está en disposición de recibir además de los ya solicitados. Si se solicita cero o menos elementos el publicador llamará al método onError del suscriptor con una instancia de la excepción IllegalArgumentException. Si se solicita Long.MAX_VALUE el publicador interpretará que el suscriptor quiere recibir todos los elementos generados por la suscripción sin ningún tipo de restricción en cuanto a su volumen, potencialmente infinito.

El método cancel es llamado por el suscriptor para solicitar al publicador que cancele la suscripción. Después de llamar este método, ni onComplete ni onError serán invocados por el publicador, pero onNext si puede llegar a ser invocado por el publicador debido a la naturaleza asíncrona de todo el proceso.

Flow.Processor<T, R>

La interface Flow.Processor<T, R> representa un componente que se comporta tanto como un publicador como un suscriptor.

El propósito principal de esta interface es la de ser implementada por componentes intermedios que se dediquen a realizar algún tipo de transformación sobre los datos recibidos de un publicador y publicarlos a su vez transformados para otros componentes.

Para terminar, es interesante notar el cuidado diseño de todas estas interfaces. Todas ellas establecen mecanismos de comunicación entre dos componentes en una única dirección. Todos los métodos tienen void como tipo retornado y admiten a lo sumo un parámetro. Y los objetos se intercambian entre los distintos componentes mediante invocaciones a métodos.

jshell, un REPL para Java

jshell es una herramienta de línea de comandos introducida en Java 9. Permite ejecutar código de una manera sencilla sin necesidad de un IDE, aunque no es un sustituto de estos. Se arranca, se escribe la sentencia en Java que se quiere ejecutar, y automáticamente se muestra el resultado de la ejecución. Es decir, jshell implementa el clásico patrón REPL (Read-Evaluate-Print Loop).

Es una herramienta bastante útil cuando se está escribiendo determinado tipo de código. Lo habitual es tenerla abierta en una consola aparte e ir probando en ella pequeños snippets de código a medida que se desarrolla. Esto permite tener un feedback inmediato en vez de esperar a compilar y ejecutar toda la aplicación en el IDE. Es útil sobre todo para probar variantes de un mismo código, experimentar con una librería con la que no se tenga demasiada experiencia, evaluar una secuencia larga de métodos concatenados, o ejecutar código copiado directamente de un sitio web.

jshell se encuentra en el subdirectorio bin del directorio donde se encuentre instalada la máquina virtual de Java. Se ejecuta arrancando directamente el programa desde línea de comandos:

jshell admite el acostumbrado parámetro -help, que muestra todas las opciones disponibles, y el clásico parámetro -v (verbose), que muestra información más detallada durante la ejecución. Parámetros ambos que merece la pena utilizar, al menos las primeras veces que se utilice la herramienta.

Una vez arrancado jshell se puede ejecutar código Java directamente en la propia línea de comandos. Por ejemplo, se puede definir una variable y asignarle un valor:

Como se observa, a pesar de ser Java, no es necesario poner punto y coma al final de la línea. La variable es automáticamente instanciada e inicializada. Se puede comprobar su valor escribiendo una expresión que haga referencia a ella:

La expresión es evaluada y el valor resultante es automáticamente impreso. Lo interesante es que la expresión puede ser todo lo compleja que se quiera:

Cuando jshell evalúa una expresión le asigna un identificador al resultado. En el ejemplo anterior, el resultado se asigna a una variable de nombre $3. Esta variable se puede referenciar y utilizar de manera independiente, o dentro de una expresión, lo que a su vez creará una nueva variable:

Además de variables, también se pueden definir clases y métodos:

jshell evalúa las expresiones cuando se pulsa retorno de carro. Si la expresión es válida, pero no está completa, entonces las siguientes líneas formarán también parte de la expresión en curso, tantas como sean necesarias para completarla.

Las clases y métodos definidos se pueden referenciar dentro de expresiones:

Si se quiere cambiar una definición sólo hay que volver a declararla, la nueva declaración sobrescribirá la anterior:

jshell es un editor de líneas que permite moverse por ellas con las teclas de cursor, tiene teclas de acceso rápido para algunas acciones de edición comunes en la mayoría de editores, algunas características básicas como un histórico de las líneas introducidas, y una potente opción de autocompletar.

El autocompletar más básico se obtiene pulsando la tecla de tabulación a medida que se escribe. Esto hace que se muestren todas las opciones disponibles que casen con la expresión en curso, o directamente se complete la expresión si sólo hay una opción disponible. Dependiendo del contexto, volviendo a pulsar tabulador, se muestra información adicional, como por ejemplo la documentación si está disponible:

Otras opciones de autocompletar se ejecutan pulsando shift junto con el tabulador. En ese punto el editor se queda esperando que se pulse una tecla. La v sirve para crear una variable con la expresión en curso, la m para crear un método, y la i para importar una clase.

Por defecto están disponibles paquetes del módulo base de Java, pero se pueden importar paquetes de librerías de terceros que se encuentren en el classpath:

jshell además de declaraciones y expresiones admite comandos. Los comandos se utilizan para mostrar información y controlar el entorno de ejecución. Para diferenciarlos de las expresiones, todos los comandos se escriben empezando por el carácter barra (/). Sirven para listar el historial (/history), las clases (/types), métodos (/methods), variables (/vars) y paquetes disponibles (/imports), volver a ejecutar un snippet (/!), utilizar un editor externo (/edit), y realizar muchas otras tareas. Es recomendable echar un vistazo a la ayuda para ver todos los comandos disponibles.

Un último comando que hay que conocer es /exit, que sirve para cerrar jshell.

En definitiva, una herramienta muy útil de la que siempre han presumido muchos otros lenguajes, sobre todo los de scripts, y que ahora también está disponible de forma nativa para Java.

Interfaces Funcionales en Java

Por definición, una interface funcional es aquella que tiene un único método abstracto. Así de sencillo. No importa que además tenga métodos estáticos o implementados por defecto.

Java 8 añadió la anotación @FunctionalInterface para marcar este tipo de interfaces. Es una anotación puramente informativa, usada para reforzar la intención de las interfaces, pero que permite a los compiladores generar un error si se aplica sobre una interface que no cumpla con la definición de interface funcional, o sobre una clase, enumerado u otra anotación.

Las interfaces funcionales se pueden instanciar con expresiones lambda, referencias a métodos o referencias a constructores. Todos ellos conceptos añadidos en Java 8.

Como se observa, la declaración de la interface no tiene que estar relacionada de manera directa con la expresión que la instancia. Ni siquiera tiene que aparecer el nombre del método funcional. Lo único que debe cumplirse es que la expresión cumpla con la interface, es decir que la expresión lambda, o la referencia al método o constructor tenga el mismo número de argumentos y tipo retornado.

Java 8 añadió una gran cantidad de interfaces funcionales dentro del paquete java.util.function, tanto para uso interno como para su uso general. Las interfaces principales son Consumer<T>, Supplier<T>, Function<T, R> y Predicate<T>, el resto son especializaciones de estas.

La nomenclatura utilizada para el nombre de las clases especializadas indica cómo se diferencian de la interfaces base que especializan:

  • El prefijo Bi indica que admite dos parámetros.
  • El prefijo Boolean, Int, Long y Double indica que admite un parámetro o genera un resultado de tipo boolean, int, long o double respectivamente.
  • El prefijo ObjInt, ObjLong y ObjDouble indica que admite dos parámetros, el primero del mismo tipo que la interface, y el segundo de tipo int, long o double respectivamente.
  • El prefijo ToInt, ToLong y ToDouble indica que produce un resultado de tipo int, long o double respectivamente.
  • El prefijo IntToLong, IntToDouble, LongToInt, LongToDouble, DoubleToInt y DoubleToLong indica que admite un parámetro del primer tipo indicado y produce un resultado del segundo.
  • El sufijo Operator indica una especialización de Function.
Consumer<T>

Consumer<T> representa una función con un argumento que no retorna nada. Tal y como su nombre indica, consume un valor y no genera nada. Un ejemplo de este tipo de funciones es el método de impresión por la salida estándar, que admite el texto a imprimir y no retorna nada:

La interface Consumer<T> ofrece el método funcional accept, e implementa por defecto el método andThen que permite concatenar varias acciones en secuencia.

Notar que la interface Consumer<T> permite trabajar en base a efectos laterales provocados al modificar el estado de los objetos consumidos, si estos no son inmutables, para que la siguiente operación indicada en el método andThen tenga sentido.

Interfaces especializadas de Consumer<T>:

Supplier<T>

Supplier<T> representa una función sin argumentos que retorna un resultado. Tal y como su nombre indica, genera un valor. Un ejemplo de este tipo de funciones es el método de generación de números aleatoriosde Math, que no requiere argumentos:

La interface Supplier<T> ofrece el método funcional get y no implementa ningún otro método.

Interfaces especializadas de Supplier<T>:

Function<T, R>

Function<T, R> representa una función con un argumento que retorna un resultado. Representa la expresión más habitual de lo que se entiende por función. Un ejemplo de este tipo de funciones es el método que convierte una cadena de texto en mayúsculas:

La interface Function<T, R> ofrece el método funcional apply y el método estático identity que retorna una función que devuelve el propio parámetro que se le pasa. E implementa por defecto el método compose, que aplica una función dada al parámetro pasado al método funcional y después ejecuta el método funcional sobre el resultado, y andThen, que permite concatenar varias acciones en secuencia.

Interfaces especializadas de Function<T, R> :

Predicate<T>

Predicate<T> representa una función con un argumento que retorna un boolean. Tal y como su nombre indica, evalúa una expresión y retorna true o false para indicar si se cumple o no la expresión. Un ejemplo de este tipo de funciones es el método que comprueba si una carácter es un espacio en blanco o no:

La interface Predicate<T> ofrece el método funcional test y el método estático isEqual , que retorna un predicado que compara los dos parámetros que se le pasan con el método Objects.equals. E implementa por defecto los métodos and, or y negate, para la composición y negación de predicados.

Interfaces especializadas de Predicate<T>:

Además de esta interfaces funcionales de uso genérico, Java 8 añadió interfaces especializadas en distintos paquetes para cubrir necesidades concretas, pero las cuatro básicas son las importantes porque explica el funcionamiento de todas las demás.

Streams en Java

Un stream es una representación abstracta de una secuencia de elementos. Su propósito principal es facilitar la ejecución de operaciones sobre dicha secuencia. Operaciones individuales sobre cada elemento, sobre un subconjunto de ellos, o sobre la totalidad de los mismos. Ejecutadas de forma secuencial o paralela. Evaluadas normalmente de forma perezosa. Y con el propósito de modificar los elementos de la secuencia, obtener una nueva secuencia, extraer un elemento individual, o calcular un valor agregado.

Los streams se diferencian de las colecciones clásicas de Java en varios aspectos:

  • No son almacenes de datos. Un stream no es una estructura de datos que almacena información. Obtiene un dato de una fuente y lo pasa a través de una cadena de operaciones.
  • Están orientados a la programación funcional. Un stream no modifica la fuente de la que obtiene datos. No basa su funcionamiento interno en efectos laterales.
  • Promueven la inicialización y evaluación perezosa (lazy). Un stream implementa todas sus operaciones de forma perezosa siempre que sea posible. Un elemento es visitado sólo una vez.
  • No tienen un tamaño fijo predeterminado. Un stream puede alimentarse de una fuente de datos potencialmente infinita.

Los streams fueron introducidos en Java 8 con el paquete java.util.stream a través de la interface genérica Stream<T>, y las interfaces especializadas IntStream, LongStream y DoubleStream que operan sobre las primitivas int, long y double respectivamente.

Un stream puede crearse de muchas formas, por ejemplo con las factorías Stream.empty o Stream.of:

O a partir de un array ya existente con el método Arrays.stream:

O a partir de una colección ya existente con los métodos stream y parallelStream proporcionados por las propias colecciones:

O con factorías que generan ellas mismas una secuencia de elementos, de un tamaño determinado como IntStream.rangeClosed, o potencialmente infinita como Random.ints:

O con métodos de clases de dominios específicos. como String.chars o Files.list:

Las operaciones a realizar sobre un stream se definen de forma habitual concatenándolas en un estilo fluent formando un pipeline con los métodos de la propia interface Stream<T>. Pudiendo ser estas operaciones ser intermedias o terminales. Las intermedias son siempre evaluadas de forma perezosa, se espera hasta alcanzar una terminal para evaluarlas. Las terminales provocan que se consuman elementos de la fuente del stream y generan un resultado o efecto lateral.

La mayoría de los métodos de la interface Stream<T> admiten una interface funcional como parámetro. Lo que en la práctica se expresa como una función lambda, referencia a un método o constructor. Por ejemplo, el método filter permite filtrar elementos de la fuente de un stream. Por lo que para filtrar los números pares de un stream de enteros basta con crear una función que diga cuando un número se considera par:

El método filter es un ejemplo de operación intermedia, de igual forma que takewhile, dropWhile, skip o distinct, por citar algunas:

Por su parte, un ejemplo de operación terminal es map, que aplica una función sobre cada elemento del stream y genera un nuevo stream con los elementos resultantes, o flatMap, que por cada elemento puede generar cero, uno, o más elementos:

Una operación similar es forEach, que aplica una función sobre cada elemento del stream pero no retorna nada:

Ejemplos de operaciones terminales que reducen un stream a un único valor son min, max, count o findFirst:

Otro conjunto de operaciones muy utilizadas son aquellas que reducen un stream a un array o una colección. La reducción a un array se realiza con toArray, mientras que la reducción a una colección se realiza con toCollect, que utiliza una familia de funciones de la clase Collectors:

La clase Collectors tiene una gama muy amplia de métodos que permiten agrupar elementos de una forma muy variada simplificando sobremanera el código. Es altamente recomendable revisar su API, junto con la de Stream<T>, para profundizar en las posibilidades ofrecidas por estas y detalles técnicos como el recorrido de colecciones ordenadas.

Utilizando streams se simplifica el código, se mejora la intencionalidad de las interfaces, y se facilita el desarrollo orientado a pruebas.

Los streams en Java ofrecen de forma nativa muchas más opciones, como la ejecución en paralelo por ejemplo, muy útiles para la implementación del patrón map-reduce sobre una cantidad elevada de datos. Desde su introducción en Java 8 han ganado mucha popularidad e introducido la programación funcional de forma natural en el ecosistema de Java.