28. März 2025

Java: Stream Gatherers mit Version 24

Seit ihrer Einführung in Java 8 hat sich die Stream-API zu einem unverzichtbaren Werkzeug für funktionale Programmierung in Java entwickelt. Mit jeder neuen Java-Version wird die API weiter ausgebaut und verfeinert. Java 24 bringt nun eine spannende Neuerung mit sich: Stream Gatherers. In diesem Beitrag zeigt David Simmen anhand eines praktischen Beispiels, wie Gatherers funktionieren, wie sie implementiert werden – und warum sie ein echtes Highlight der neuen Version sind.

Software Development & Architecture
Java 24 Stream Gatherers Beitragsbild zu Blogpost

Einführung in die Stream-API

Die Java Streams API ermöglicht es, umfangreiche Codeblöcke durch funktionale Programmierung deutlich zu vereinfachen und lesbarer zu gestalten. Ein Stream beginnt stets mit einer Quelle, auf die optional eine oder mehrere verkettete Zwischenoperationen folgen, bevor eine abschliessende Terminal-Operation den Stream verarbeitet. Für diese Terminal-Operationen stehen zahlreiche Methoden zur Verfügung. Darunter etwa toList() für häufige Anwendungsfälle sowie die flexiblere Methode collect(), mit der sich unterschiedlichste Terminal-Operationen mithilfe von Collectorsumsetzen lassen.

Für die Zwischenoperationen (Intermediate Operations) fehlte bisher eine vergleichbare Flexibilität. Nehmen wir etwa eine Liste von Zahlen, aus der wir alle Elemente herausfiltern möchten, die kleiner sind als ein zuvor verarbeitetes Element. Bisher liess sich so ein Anwendungsfall nur mit individuell geschriebenem, oft umständlichem Code lösen:

final List<Integer> allIntegers = List.of(1, 3, 2, 4, 1, 5, 2, 6);
int state = 0;
final List<Integer> result = new ArrayList<>();
for (Integer element : allIntegers) {
    if (element > state) {
        state = element;
        result.add(element);
    }
}

Gatherer und die neue gather()-Methode

Möchten wir stattdessen die neue gather()-Methode nutzen, benötigen wir zunächst einen sogenannten Gatherer. Ähnlich wie die bekannte Collectors-Hilfsklasse, die eine Vielzahl vordefinierter Collector-Implementierungen bereitstellt, bietet auch die neue Gatherers-Hilfsklasse eine Sammlung gebrauchsfertiger Gatherer. Für unseren Anwendungsfall steht allerdings kein passender vordefinierter Gatherer zur Verfügung – daher müssen wir eine eigene Implementierung schreiben:

public class Increasing implements Gatherer<Integer, AtomicInteger, Integer> {

    public Supplier<AtomicInteger> initializer() {
        return AtomicInteger::new;
    }

    public Integrator<AtomicInteger, Integer, Integer> integrator() {
        return new IncreasingIntegrator();
    }

    private static class IncreasingIntegrator implements Integrator<AtomicInteger, Integer, Integer> {

        public boolean integrate(AtomicInteger state, Integer element, Downstream<? super Integer> downstream) {
            if (element > state.get()) {
                state.set(element);
                downstream.push(element);
            }
            return true;
        }
    }
}

Ein Gatherer muss mindestens die Methode integrator() implementieren, die einen sogenannten Integrator zurückliefert. Dieser Integrator übernimmt die eigentliche Verarbeitung der Elemente über die Methode integrate(). Dabei werden die Stream-Elemente so lange verarbeitet, bis entweder keine weiteren vorhanden sind oder integrate() den Wert false zurückgibt. Zusätzlich arbeitet der Gatherer mit einem internen Zustand (State), der über die Methode initializer() für jeden Stream individuell erzeugt wird. In unserem Beispiel verwenden wir zur Vereinfachung einen AtomicInteger als State.

Mit diesem Gatherer lässt sich der Code nun wie folgt vereinfachen:

final List<Integer> result2 = allIntegers.stream()
                .gather(new Increasing())
                .toList();

Die neue gather()-Methode trägt wesentlich zur Vereinfachung des Codes bei, indem sie die gewünschte Logik in einen wiederverwendbaren Gatherer auslagert.

Parallele Verarbeitung mit Gatherern

Gatherer lassen sich zudem parallel ausführen – vorausgesetzt, die Methode combiner() ist implementiert. Der zurückgegebene Combiner sorgt dafür, dass zwei Zustände (States) zusammengeführt werden können. Das bedeutet jedoch auch, dass parallele Aufrufe von integrate() unabhängig voneinander arbeiten müssen, da sie keine vollständige Sicht auf den Gesamtzustand haben dürfen. Erst nach dem Zusammenführen der Zustände kann der finale Zustand über die Methode finisher() ausgewertet werden.
Ein einfaches Beispiel hierfür ist ein Gatherer, der zählt, wie viele Elemente verarbeitet wurden:

public class CountingGatherer<T> implements Gatherer<T, AtomicInteger, T> {

    private final Function<T,T> function;

    public CountingGatherer(Function<T, T> function) {
        this.function = function;
    }

    public Supplier<AtomicInteger> initializer() {
        return AtomicInteger::new;
    }

    public Integrator<AtomicInteger, T, T> integrator() {
        return (state, element, downstream) -> {
                state.incrementAndGet();
                downstream.push(function.apply(element));
                return true;
        };
    }

    public BinaryOperator<AtomicInteger> combiner() {
        return (a,b)->new AtomicInteger(a.get()+b.get());
    }

    public BiConsumer<AtomicInteger, Downstream<? super T>> finisher() {
        return (s, d)-> System.out.println(s.get()+" Elements processed");
    }
}

Fazit

Die neue gather()-Methode erweitert die Stream-API um ein lang ersehntes Feature: benutzerdefinierte, wiederverwendbare Zwischenoperationen. Sie ermöglicht nicht nur elegantere Lösungen, sondern eröffnet auch neue Wege für die parallele Verarbeitung komplexer Logik.

Es ist nur eine Frage der Zeit, bis erste Third-Party-Utility Libraries eine Sammlung praxisnaher Gatherer bereitstellen – ähnlich wie es Collectors schon lange tun.

Jetzt ist der perfekte Zeitpunkt, selbst mit Gatherers zu experimentieren und eigene Bausteine für wiederverwendbare Stream-Logik zu entwickeln!

Weitere Dokumentation

JEP 485
Gatherer
Integrator
Gatherers