Java Goes Parallel

Um die Leistungsfähigkeit moderner Multicore-Rechner auszuschöpfen, müssen wir Software heute so entwickeln, dass die zur Verfügung stehenden Rechenkerne optimal eingesetzt werden. Die parallele Programmierung, d. h. der Umgang mit Threads und Co. erhält somit Einzug in die Anwendungsentwicklung. Für die Entwicklung paralleler Software benötigt man aber sehr spezifisches Know-how, das im Allgemeinen nur wenige Experten haben. Bei Java wurden mit der Version 8 verschiedene Frameworks eingeführt, mit deren Hilfe man nun parallele Anwendungen einfacher entwickelt kann, ohne in die Tiefen der nebenläufigen Programmierung einsteigen zu müssen. Beim Einsatz dieser Frameworks müssen allerdings auch verschiedene Regeln beachtet werden. Die Parallelisierung erhält man auch hier nicht zum Nulltarif.
Bei Java wurde die Möglichkeit der nebenläufigen und somit parallelen Programmierung direkt im Sprachstandard verankert. Diese Standardkonzepte sind aber sehr rudimentär und mit vielen Limitierungen behaftet. Für unabhängige Tätigkeiten kann man zwar einfach verschiedene Threads starten, sobald sie aber gemeinsam eine Aufgabe erledigen sollen, müssen die gestarteten Aktivitäten mühsam verwaltet und koordiniert werden. Mit dem Aufkommen der Multicore-Rechner wurde deshalb eine Concurrency-Bibliothek (Paket java.util.concurrency) eingeführt, die viele nützliche abstrakte Konstruktionen zur Steuerung und Synchronisation von Threads bereitstellt. Neben Threadpools und Atomic-Klassen sind dies insbesondere Synchronisationskonzepte, wie verschiedene Locks, Barrieren und Semaphoren. Mit all diesen Konzepten lassen sich nun zwar parallele Abläufe einfacher implementieren, es ist aber immer noch viel Handarbeit notwendig. So muss z. B. bei einem "Parallel-For" der Datenbereich explizit aufgeteilt und die Teilergebnisse explizit eingesammelt und verarbeitet werden. Das sind alles Arbeiten, die nicht zur eigentlich umzusetzenden Fachlogik gehören und darüber hinaus fehleranfällig sind. Nachdem man bei Java 7 mit dem ForkJoin-Framework einen ersten Schritt in Richtung "vereinfachter Parallelisierung" gegangen ist, wurde bei Java 8 dieser Weg konsequent weiter verfolgt. Mit den Möglichkeiten der parallelen Array-Verarbeitung, den parallel Streams und der CompletableFuture-Klasse kann ein Entwickler nun leichter Parallelität implementieren, ohne in die Tiefen von Speicher- bzw. Threadmanagement und Synchronisation einzutauchen. In diesem Artikel schauen wir uns diese Parallelisierungsmöglichkeiten genauer an und zeigen, wie und wann man sie gewinnbringend einsetzen kann. Es gilt nämlich auch hier, dass man nicht in jedem Fall eine Beschleunigung seiner Programme erhält und schon gar nicht immer gratis.
Parallele Array-Verarbeitung
Die Klasse Arrays bietet für einige Standardfälle eine Parallelverarbeitung an: Für das Befüllen, das Sortieren und die Berechnung eines Prefixes. Hierzu stehen verschiedene Überladungen der Methoden parallelSetAll, parallelSort und parallelPrefix zur Verfügung. Codebeispiel 1 zeigt die Verwendung. Zuerst wird hier ein int-Array erzeugt und parallel mit Zufallszahlen gefüllt. Danach wird es sortiert und die additive Prefixsumme berechnet. Da das Befüllen parallel ausgeführt wird, benötigen jetzt mehrere Threads gleichzeitig Zugriff auf einen Zufallszahlengenerator. Das hat zur Konsequenz, dass man hier nicht einfach ein gewöhnliches Random-Objekt benutzen sollte. Der Zugriff auf ein Random-Objekt ist zwar "threadsicher", aber konkurrierende Zugriffe werden beim Zugriff auf das Objekt intern sequentialisiert und behindern somit den parallelen Ablauf. Java stellt für diesen Anwendungsfall deshalb die Klasse ThreadLocalRandom zur Verfügung, über die jeder Thread Zugriff auf einen eigenen Zufallsgenerator erhält. Man sieht bereits an diesem einfachen Beispiel, dass man die Parallelisierung nicht unbedarft einsetzen darf. Allgemein gilt, dass die Anweisungen für die parallel ablaufenden Threads, in der Regel Lambda-Ausdrücke, seiteneffektfrei sein müssen, d. h. sie dürfen sich nicht gegenseitig in die Quere kommen. Seiteneffekte werden in der Regel vermieden, indem die Lambda-Ausdrücke nur lokale Variable, zustandslose Methoden oder nichtveränderbare Objekte benutzen. Die Benutzung von synchronisierenden bzw. sequentialisierenden Methoden ist zwar auch unkritisch, wirkt sich aber negativ auf die Parallelisierung aus.
Codebeispiel 1: Parallele Array-Verarbeitung
final int size = 1_000_000;
int[] array = new int[size];
Arrays.parallelSetAll(array, i -> ThreadLocalRandom.current().nextInt(100));
Arrays.parallelSort(array);
Arrays.parallelPrefix(array, (i,j) -> i + j );
An diesem Beispiel sieht man auch die Eleganz der Formulierung. Der Entwickler muss sich überhaupt keine Gedanken über die Threaderzeugung oder -verwaltung machen. Zur Umsetzung der Parallelverarbeitung wird intern das ForkJoin-Framework verwendet, das bereits von Java 7 eingeführt und in Java 8 nochmal verbessert wurde. Das ForkJoin-Framework teilt hier das zugrundeliegende Array rekursiv, bis zu einer systemabhängigen Tiefe, in Teilbereiche auf (Fork-Phase), die dann parallel bearbeitet werden (Work-Phase). Anschließend werden die Teilergebnisse wieder zusammengefügt (Join-Phase). In Abb.1 ist der Prozess schematisch dargestellt.
Streams – Ein allgemeines Konzept für parallele Datenverarbeitung
Mit Java 8 wurde ein neues Verarbeitungsparadigma für Datensammlungen eingeführt, die sogenannten Streams. Ein Stream wird in der Regel auf Grundlage einer Datenquelle, wie z. B. einer Collection oder einer Datei erzeugt. Danach lassen sich verschiedene Operationen, wie z. B. Filter oder Transformationsfunktionen anwenden, bis der Stream mit einer terminalen Operation abgeschlossen bzw. ausgewertet wird (vgl. Abb.2). Man muss hierbei beachten, dass erst die terminale Operation die Verarbeitung startet und dass dabei die zugrundeliegende Datenquelle selbst nie verändert wird.
Der Clou bei der Sache ist nun, dass die Verarbeitung einfach automatisch parallelisiert werden kann, in dem man statt eines gewöhnlichen Streams einen parallelen Stream benutzt.
Codebeispiel 2 zeigt, wie man aus einem int-Array einen parallelen Stream (vom Typ IntStream) erzeugt und die kleinste gerade Zahl bestimmt. Der Lambda-Ausdruck (vom Typ Predicate) für den Filter mustert die ungeraden Zahlen aus und min bestimmt die kleinste Zahl von den verbleibenden. Die Auswertung liefert ein OptionalInt, da ja die Möglichkeit besteht, dass gar kein Minimum gefunden wird, falls z. B. das Array nur ungerade Zahlen enthält.
Codebeispiel 2: Array-Verarbeitung mit Streams
int[] array = ...;
OptionalInt min = Arrays.stream(array)
.parallel()
.filter( i -> i%2 == 0 )
.min();
min.ifPresent( i-> System.out.println("Minimum " + i ) );
Auch hier muss man sich keine Gedanken über Threaderzeugung und -verwaltung machen. Bei der Realisierung der parallelen Streamverarbeitung wird wie im vorherigen Beispiel intern das ForkJoin-Framework benutzt. Die Datenquelle wird rekursiv in Teilbereiche aufgeteilt (man spricht hier von der Split-Phase), die Verarbeitung parallel auf den Teilbereichen durchgeführt und anschließend das Ergebnis aus den Teilergebnissen ermittelt (man spricht hier von der Reduce- bzw. Collect-Phase). Abb.3 illustriert schematisch den Prozess.
Ob sich nun eine parallele Streamverarbeitung lohnt, hängt von vielen Faktoren ab. Die Wichtigsten sind:
- Die Größe, d. h. die Anzahl der Daten. Sind nur wenige Daten zu bearbeiten, lohnt sich der Aufwand des Aufteilens des Datenbereichs und des Zusammenfügens der Teilergebnisse nicht.
- Der Aufwand der einzelnen Verarbeitungsschritte. Besteht die Arbeit der Threads nur aus kurzen und schnell zu erledigenden Arbeitsschritten, so lohnt sich in vielen Fällen eine Parallelisierung auch nicht.
Als Faustregel kann hier die NQ-Formel angewendet werden. Wenn N*Q > 10.000 ist, dann kann eine Parallelverarbeitung den Ablauf beschleunigen. In der Formel entspricht N der Anzahl der Datenelemente und Q dem Aufwand der Verarbeitung für ein Datenelement. Im Zweifel helfen entsprechende Benchmarkmessungen, um die richtige Entscheidung zu treffen.
Die Anwendung der NQ-Formel alleine reicht aber noch nicht aus. Es gibt noch zwei weitere wichtige Punkte, die beachtet werden müssen. Nämlich, ob sich die Datenquelle effizient aufteilen lässt und ob sich die Teilergebnisse effizient zusammenführen lassen.
Schauen wir uns zuerst den ersten Punkt, das Aufteilen der Datenquelle, etwas näher an. Hierzu betrachten wir als Datenquellen diesmal Collections, nämlich die beiden List-Implementierungen ArrayList und LinkedList. Bei einer ArrayList kann sehr schnell auf eine beliebige Elementposition zugegriffen werden, indem einfach die Speicheradresse ausgerechnet wird. Greift man dagegen bei einer LinkedList auf eine bestimmte Position zu, muss immer traversiert werden, d. h. man muss von vorne beginnend die Elemente durchzählen. Eine ArrayList lässt sich somit sehr effizient rekursiv in Teilbereiche aufteilen, eine LinkedList dagegen nicht. So ist z. B. die in Codebeispiel 3 gezeigte Streamverarbeitung basierend auf der ArrayList für große Collections deutlich schneller als die Verarbeitung auf der LinkedList (vgl. Abb.4).
Codebeispiel 3: Listen als Streamquellen
List<String> arrayList = new ArrayList<>();
List<String> linkedList = new LinkedList<>();
// Initialisiere Listen mit Zufallsstrings
long count1 = arrayList.parallelStream()
.filter( s -> s.contains("abcde"))
.count();
long count2 = linkedList.parallelStream()
.filter( s -> s.contains("abcde"))
.count();
Weitere Streams, die sich nicht sonderlich gut aufteilen lassen, sind z. B. solche, die aus einem Iterationsprozess erzeugt oder aus Dateien gewonnen werden. Sie alle haben keine wahlfreie Zugriffseigenschaft.

Beim Zusammensetzen der Teilergebnisse (terminale Operation bzw. Auswertung des Streams) müssen wir zwischen einer Reduktion und einer Sammlung unterscheiden. Bei einer Reduktion wird typischerweise ein Wert ermittelt, wie z. B. die Anzahl der verbleibenden Elemente im Codebeispiel 3. Solche Operationen sind in der Regel eher unkritisch. Wird eine neue Datensammlung erzeugt, müssen in der Collect-Phase Sammlungen zusammengefügt werden. Das Zusammensetzen von großen Listen ist hier eher unkompliziert, solange die Sortierung keine Rolle spielt. In dem Fall können ArrayLists einfach hintereinander kopiert bzw. LinkedLists einfach verkettet werden. Das Zusammensetzen von großen Sets oder Maps ist dagegen mit wesentlich mehr Aufwand verbunden, da hier die Elemente explizit in eine vorhandene Struktur eingefügt werden müssen. Bei dem im Codebeispiel 4 gezeigten Programm ist für große Ergebnislisten das parallele Sammeln in eine Liste deutlich schneller, wie das parallele Sammeln in eine Set, wobei natürliche die Ergebnisse nicht äquivalent sind, da eine Set keine doppelten Einträge zulässt. In dem Beispiel ist es sogar so, dass die gewöhnliche, nicht parallele Streamverabreitung beim Sammeln in einem Set doppelt so schnell ist wie ihre parallele Variante (vgl. Abb.5).
Codebeispiel 4: Streamverarbeitung mit Collectoren
List<String> arrayList = new ArrayList<>();
// Initialisiere Listen mit Zufallsstrings
List<String> list = arrayList.parallelStream()
.filter( s -> s.contains("a"))
.collect( Collectors.toList() );
Set<String> set = arrayList.parallelStream()
.filter( s -> s.contains("a"))
.collect( Collectors.toSet() );

Um den beiden zuletzt genannten Punkten entgegenzuwirken, kann sowohl in den Split- als auch in den Collect-Prozess durch das Bereitstellen von eigenen Klassen, sogenannten Spliteratoren bzw. Collectoren, eingegriffen werden. Diese sollten aber mit Bedacht eingesetzt werden, da man sich dadurch in die Tiefen der nebenläufigen Programmierung begibt, die einige Fallstricke beinhaltet.
Ein weiterer Punkt, den man im Auge behalten sollte, ist das Speicherlayout der Daten. Der Einsatz von Parallelisierung kann bei einem ungünstigen Speicherlayout unter Umständen zu vielen "Cache Misses" führen, so dass die Sicherstellung der Cache-Kohärenz zu einem Ausbremsen der Anwendung führt. Bei Java hat man nicht wirklich viele Möglichkeiten, diesem Effekt entgegen zu wirken. Hier kann aber die geplante Einführung von Value Types Abhilfe schaffen.
CompletableFuture – Ein mächtiges Werkzeug für Task-Parallelität
Alle bisher besprochenen Konzepte können für die parallele Verarbeitung von Datenmengen eingesetzt werden. Hierbei werden auf jedes Datenelement die gleichen Operationen angewendet, wobei diese unabhängig bzw. seiteneffektfrei sein müssen. Man spricht in diesem Fall von Datenparallelität. Neben dieser Datenparallelität unterstützt Java nun mit Hilfe der Klasse CompletableFuture auch die vereinfachte Implementierung von Task-Parallelität. Hierbei können verschiedene Aufgaben (Tasks) zu komplexen Abläufen verknüpft werden.
Die Klasse CompletableFuture kann als Erweiterung des Future-Patterns angesehen werden, das von Java 5 zusammen mit den Threadpools eingeführt wurde. Beim Future-Pattern wird ein Task (Aufgabe) in Form einer Callable-Klasse implementiert, dessen Instanz an einen Threadpool (Executor) übergeben und von diesem asynchron ausgeführt wird. Vom Threadpool erhält man bei der Übergabe ein Future-Objekt zurück, das stellvertretend für das Ergebnis des Tasks steht. Über dieses Stellvertreterobjekt kann zu einem späteren Zeitpunkt auf das Ergebnis zugegriffen werden (vgl. Abb.6).
Mit Hilfe des Future-Patterns kann lediglich immer nur ein Task asynchron ausgeführt werden. Wenn der Task zu Ende ist, muss aktiv das Ergebnis ausgelesen und dann unter Umständen der nächste asynchrone Verarbeitungsschritt gestartet werden. Die CompletableFuture-Klasse hebt diese Limitierung auf, in dem zusätzlich zum Future-API eine Schnittstelle für Task-Kompositionen (CompletionStage-Interface) bereitgestellt wird. Somit lassen sich komplexere Ablaufpfade spezifizieren, die dann nach Wahl nebenläufig in Threads aus einem Threadpool oder synchron vom aufrufenden Thread ausgeführt werden (vgl. Abb.7).
Wenn in einer solchen Ablaufkette ein Task beendet ist, werden der oder die anhängigen Tasks automatisch gestartet, wobei das Ergebnis immer an den Nachfolger übergeben wird. Hierbei können auch zwei Ergebnisse durch UND oder ODER verknüpft werden. Bei einer UND-Verknüpfung erhält der Nachfolgetask beide Ergebnisse der Vorgänger, bei einer ODER-Verknüpfung erhält er das Ergebnis, das zuerst vorliegt.
Codebeispiel 5 zeigt eine Implementierung des in Abb.7 gezeigten Ablaufs, wobei die Aufgaben der Tasks hier recht einfach gehalten sind.
Codebeispiel 5: Implementierung einer komplexen Ablaufkette
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "world");
task1.thenCombineAsync(task2, (s1, s2) -> String.join(" ", s1, s2) )
.thenApplyAsync( String::toUpperCase )
.thenAcceptAsync( System.out::println );
Zuerst werden zwei Tasks definiert, durch die jeweils der String "hello" bzw. der String "world" erzeugt wird. Die beiden Tasks werden anschließend durch eine "und"-Methode (thenCombineAsync) verknüpft, wobei hier ein neuer String erzeugt wird, der die beiden Ergebnisse der Vorgänger durch ein Leerzeichen getrennt verkettet. Danach wird der String noch in Großbuchstaben umgewandelt und anschließend auf der Konsole ausgegeben. Jeder einzelne Task wird hierbei asynchron in einem eigenen Thread ausgeführt. Der Thread, in dessen Code der Ablaufpfad definiert wird, kann ungehindert weiterlaufen.
Die Klasse CompletableFuture bietet neben verschiedenen Verknüpfungsoperationen für Tasks auch Synchronisationsbarrieren (vgl. Abb.8). So kann z. B. mit der statischen Methode anyOf auf das erste Ergebnis von verschiedenen, asynchron ausgeführten spekulativen Berechnungen gewartet werden. Die Berechnungen können hierbei asynchron ablaufen. In Codebeispiel 6 werden drei parallele Aufrufe von getValueFrom durchgeführt. Man beachte, dass die Methode threadsicher sein muss und möglichst keine Synchronisierung bzw. Sequentialisierung beinhalten sollte. Sobald einer der Return-Werte vorliegt, wird er ausgegeben.
Codebeispiel 6: Synchronisation paralleler spekulativer Berechnungen
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(
() -> getValueFrom("goedel.hs-kl.de") );
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(
() -> getValueFrom("escher.hs-kl.de") );
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(
() -> getValueFrom("bach.hs-kl.de") );
CompletableFuture.anyOf(task1, task2, task3)
.thenAcceptAsync( System.out::println );
Schaut man sich das API der Klasse CompletableFuture näher an, so erscheint es auf den ersten Blick recht unübersichtlich. Das liegt daran, dass es jede Methode in verschiedenen Varianten (Überladungen) gibt. Prinzipiell kann bei jeder Task-Ausführung festgelegt werden, ob er asynchron ausgeführt wird und wenn ja, welcher Threadpool benutzt werden soll. Weiter gibt es Ausführungsmethoden, die einen Wert liefern, wie z. B. thenApply oder Ergebnisse nur konsumieren, wie z. B. thenAccept. Ein Thema, das wir hier komplett ausgeklammert haben, ist die Fehlermeldung bzw. -behandlung. Auch hierzu bietet die Klasse CompletableFuture einige Konzepte an. Hat man den Aufbau des APIs erstmal durchschaut, lassen sich mit der Klasse bequem komplexe asynchrone Abläufe definieren und starten sowie komfortabel benutzbare asynchrone APIs entwickeln.
Resümee
Nachdem in den früheren Versionen Java um zahlreiche Concurrency-Tools erweitert wurde, stehen ab der Version 8 auch mächtige Frameworks zur Verfügung, mit deren Hilfe Anwendungen durch Parallelisierung einfacher beschleunigt werden können. Zu nennen sind hier insbesondere die parallelen Streams und die Klassen CompletableFuture. Da der Einsatz dieser Frameworks recht einfach erscheint, ist ihr Einsatz in der täglichen Arbeit verlockend. Doch Vorsicht, ganz so unbedarft darf man die Frameworks nicht einsetzen. Ein grundlegendes Verständnis der nebenläufigen Programmierung und der Funktionsweise der Frameworks ist für deren gewinnbringenden Einsatz unerlässlich. Wir haben hier anhand einfacher Beispiele nur einige zu beachtende Aspekte ansprechen können und hoffen, dass wir Sie für das Thema sensibilisiert haben. Hält man sich an das Programmiermodell und besitzt man ein Verständnis der internen Abläufe, steht aber dem Einsatz der Parallelisierungsframeworks in der Praxis nichts mehr im Weg.
- JavaDoc: Arrays
- JavaDoc: Streams
- Wikipedia: ForkJoin-Pattern
- Wikipedia: Future-Pattern
- JavaDoc: CompletableFuture