Über unsMediaKontaktImpressum
Heiko Seeberger 22. September 2015

Einführung in Akka Streams

Die große Bedeutung von Daten ist uns Anwendungsentwicklern schon seit langem bewusst. Aber die heutzutage oftmals rasant wachsenden Datenmengen sowie Anforderungen, diese sofort – also quasi in Echtzeit – zu nutzen, können es erforderlich machen, Daten als Datenströme zu betrachten. Als Beispiel sei ein Online-Shop genannt, der den Kunden unter anderem aufgrund ihres aktuellen Verhaltens potentiell geeignete Kaufempfehlungen präsentiert.

Ein anderes – etwas technischeres – Beispiel ist ein HTTP-Server. Sowohl die eingehenden Verbindungen, als auch die Anfragen auf einer bestehenden Verbindung können als Ströme betrachtet werden. Dabei wird eine wichtige Eigenschaft von Datenströmen offenbar, die sie deutlich von Collections unterscheidet: sie sind flüchtig und unbegrenzt. Oder, um mit Heraklit zu sprechen: "Wer in dieselben Flüsse hinabsteigt, dem strömt stets anderes Wasser zu".

Das Beispiel HTTP-Server macht uns auch klar, dass Daten über Systemgrenzen hinweg strömen können. Daraus folgt unmittelbar, dass wir Vorkehrungen treffen müssen, um zu vermeiden, dass bei einem System zu viele Daten schneller eintreffen, als sie verarbeitet werden können. Sonst würden wir im günstigsten Fall "bloß" Daten verlieren und im ungünstigsten Fall ein System zum Absturz bringen, zum Beispiel wegen eines OutOfMemoryErrors. Mit anderen Worten benötigen wir für Datenströme dringend eine Form von Flusskontrolle zur geeigneten Regelung der Datenrate, wie sie zum Beispiel bei TCP eingesetzt wird.

Beide Beispiele verdeutlichen auch, dass es bei Datenströmen weniger um die Gesamtheit der Daten geht, sondern vielmehr darum, deren Elemente zu transformieren, z. B. eine HTTP-Anfrage in eine HTTP-Antwort oder eine Reihe von Interaktionen mit dem Online-Shop in die Präsentation einer bestimmten Kaufempfehlung. Neben den Transformationen interessiert häufig auch das Versiegen von Datenströmen, z. B. das Abbrechen einer  HTTP-Verbindung wegen Netzwerkfehlern oder das Berechnen von Aggregaten von endlichen Datenströmen.

Bei Reactive Streams

Wenn wir auf der Java Virtual Machine (JVM) mit Datenströmen arbeiten möchten, dann können wir auf den im Frühjahr 2015 veröffentlichten Standard Reactive Streams [1] bauen. Dabei handelt es sich um eine Spezifikation zur Steuerung von Datenströmen über asynchrone Grenzen hinweg mittels nicht-blockierendem Rückdruck.

Um zu vermeiden, dass ein schneller Produzent von Daten einen langsameren Konsumenten überlastet, setzt Reactive Streams auf eine besondere Form von Flusskontrolle: Nur wenn der Konsument Bedarf in einer gewissen Größenordnung signalisiert, darf der Produzent dementsprechend Daten senden. Mit anderen Worten haben wir – wie in Abb.1 gezeigt – zwei Ströme, einen für die Daten und einen entgegengesetzt gerichteten für den Bedarf. Im Endeffekt führt das dynamisch und adaptiv zu einem Pull- oder Push-Verfahren, je nachdem welche der beiden Seiten gerade schneller ist.

Bei Reactive Streams wird Asynchronizität besonders stark betont. Wir haben am Beispiel des HTTP-Servers bereits gesehen, dass Daten über Systemgrenzen hinweg strömen können, was natürlich automatisch zu Asynchronizität führt. Aber auch innerhalb eines Systems kann asynchrone Verarbeitung dazu verwendet werden, um zu Skalierbarkeit und Widerstandsfähigkeit zu gelangen; weitere Details hierzu liefert das sehr empfehlenswerte Reaktive Manifest [2]. Um von Asynchronizität wirklich profitieren zu können, ist es von höchster Bedeutung, auf blockierende Verarbeitung zu verzichten. Daher schreibt Reactive Streams vor, dass nicht nur die Daten, sondern auch der Bedarf nicht-blockierend übermittelt werden muss.

Reactive Streams definiert ein API, welches jedoch nicht für Endanwender gedacht ist, sondern für Implementierungen. Zum Zeitpunkt der Veröffentlichung der Version 1.0.0 [3] gab es derer neben Akka Streams sieben weitere, u. a. RxJava und Vert.x. Für einen Vergleich sowie eine abrundende generelle Betrachtung von Reactive Streams sei der Vortrag von Mathias Doenitz auf den Scala Days Amsterdam 2015 empfohlen [4]. Dieser Vortrag hat nicht nur diesen Text, sondern auch das im Folgenden verwendete Beispiel maßgeblich inspiriert.

Akka

Bevor wir uns Akka Streams zuwenden, beleuchten wir ganz kurz Akka [5] als Ganzes. Im Kern stellt Akka eine Implementierung des Aktorenmodells [6] in Scala [7] dar, wobei Akka ebenfalls ein gleichwertiges Java-API bietet. Wir werden für die Code-Beispiele Scala verwenden, weil Sprachmerkmale wie Case Classes und Pattern Matching zu viel verständlicherem Code führen, als das mit Java möglich wäre.

Im Aktorenmodell sind Aktoren die fundamentalen Bausteine eines jeden Programms. Sie sind hierarchisch angeordnet, kommunizieren ausschließlich über asynchrone Nachrichten und teilen keinerlei veränderlichen Zustand. Fehler werden isoliert und vom in der Hierarchie höher angeordneten Aktor behandelt. Aus diesen Eigenschaften resultieren letztendlich
vertikale wie horizontale Skalierbarkeit sowie Widerstandsfähigkeit.

Akka bietet etliche weitere Module, die alle auf den Akka-Aktoren basieren und höherwertige Abstraktionen bieten, zum Beispiel für verteilte Systeme, Cluster, Replikation, Partitionierung, HTTP, etc. Für weitere Informationen sei auf die Akka-Website oder auf den Blog-Post A Map of Akka [8] verwiesen.

Akka Streams

Akka Streams ist eines dieser Module und stellt – der Name ist Programm – eine Implementierung der Spezifikation von Reactive Streams dar. Aktuell liegt die Version 1.0 vor, die noch als experimentell gekennzeichnet ist, was nicht bedeutet, dass die Funktionalität eingeschränkt oder gar teils fehlerhaft ist, sondern dass sich das API noch ohne Deprecation-Zyklen ändern kann.

Laut Dokumentation bestimmten Zusammensetzbarkeit und Wiederverwendung maßgeblich den Entwurf des API von Akka Streams. Das bedeutet, dass sich mit Akka Streams Datenstromtopologien – z. B. lineare Flüsse oder fast beliebig komplexe Graphen – aus einzelnen Teilen zusammensetzen lassen, die jedes für sich alleine in verschiedenen
Datenströmen wiederverwendet werden können.

Daraus resultierend unterscheidet das API zwischen den Beschreibungen von Datenstromtopologien und deren Ausführung: Die mit dem API erstellten Beschreibungen sind unveränderlich und können ohne Einschränkung von verschiedenen Programmen geteilt werden. Zur Ausführung gelangt man erst durch die sogenannte Materialisierung, einen Prozess, der die Beschreibung auf eine Ablaufumgebung – standardmäßig natürlich  Akka-Aktoren – abbildet und den Datenstrom startet.

Um Akka Streams in einem sbt-Projekt [9] – sbt ist quasi das Standard-Build-Werkzeug für Scala-Projekte – verwenden zu können, müssen wir die folgende Abhängigkeit definieren:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "1.0"

Lineare Datenströme

Um uns mit dem API vertraut zu machen, betrachten wir erst einmal einfache lineare Ströme, die mit den folgenden Bausteinen beschrieben werden:

  • Source: ein Baustein mit einem Ausgang für Daten.
  • Flow: ein Baustein mit einem Eingang und einem Ausgang für Daten.
  • Sink: ein Baustein mit einem Eingang für Daten.

Diese Bausteine für sich stellen noch keinen eigentlichen Datenstrom dar. Wie in Abb.2 gezeigt, können sie jedoch mittels Kombinatoren geeignet zusammengeführt werden, woraus dann ein RunnableGraph entsteht, der einen ausführbaren Datenstrom repräsentiert.

Der einfachste Datenstrom entsteht durch das Verbinden einer Source mit einer Sink mittels des Kombinators to:

source.to(sink)

Um Flows mit einer Source zu verbinden oder mehrere Flows miteinander, wird der Kombinator via verwendet. Am Schluss steht wieder to zum Verbinden mit einer Sink:

source.via(flow1).via(flow2).to(sink)

Um einen beliebigen RunnableGraph auszuführen, rufen wir einfach die Methode run auf. Allerdings benötigen wir hierfür einen impliziten Materializer, um den oben beschriebenen Prozess der Materialiserung auszuführen. Aktuell bietet Akka Streams einzig den ActorMaterializer, der – wenig verwunderlich – ein ActorSystem, also quasi eine Ablaufumgebung für Akka-Aktoren benötigt.

Um uns das anhand eines Beispiels zu verdeutlichen, benötigen wir konkrete Bausteine. Auf Basis einer Collection erzeugen wir eine einfache Source und mittels Sink.foreach konstruieren wir eine Sink, der wir eine Funktion übergeben, welche für jedes Datenelement aufgrufen wird. Anstelle to und run hintereinander aufzurufen, können wir auch gleich runWith aufrufen:

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Source(List(1, 2, 3)).to(Sink.foreach(println)).run()
Source(List(1, 2, 3)).runWith(Sink.foreach(println))

So weit, so gut, aber wie können wir Datenelemente tatsächlich transformieren? Dazu dienen Prozessschritte – sogenannte Processing Stages –, von denen Akka Streams bereits zahlreiche vorgefertigt mitbringt. Falls diese nicht ausreichen, können eigene definiert werden.

Wenngleich sich Datenströme – wie eingangs gesagt – fundamental von Collections unterscheiden, so gleichen sich die jeweiligen Operationen, die sich um die Transformation von Elementen drehen, doch sehr. Beispielsweise können wir auf einer Source oder einem Flow – also Bausteinen mit einem Ausgang – map oder filter aufrufen, wie wir es von den Scala-Collections gewohnt sind:

val addOne = Flow[Int].map(_ + 1)
Source(List(1, 2, 3))
.via(addOne)
.filter(_ % 2 == 0)
.runWith(Sink.foreach(println))

Hier definieren wir zunächst mit addOne einen Flow vom Typ Int, also einen, welcher  Eingangs- und Ausgangswerte vom Typ Int hat. Dann fügen wir mit map einen Prozessschritt hinzu, um eins zu addieren. Das Resultat kombinieren wir mittels via mit einer Source, welche Ints produziert und fügen auf dem Resultat mittels filter einen weiteren Prozessschritt hinzu, der nur die geraden Zahlen behält.

Bei allen Ähnlichkeiten mit Collections besteht jedoch ein fundamentaler Unterschied, der aus obigem Code nicht ersichtlich ist: Jeder Prozessschritt stellt eine asynchrone Grenze dar. Daraus folgen zwei interessante Tatsachen. Zum einen entsteht ein gewisser Mehraufwand durch den asynchronen Nachrichtenaustausch im Vergleich zu "gewöhnlichen" synchronen Methodenaufrufen. Zum anderen können die einzelnen Prozessschritte, dadurch dass sie voneinander völlig entkoppelt sind, individuell materialisiert werden, um dadurch Skalierbarkeit und Widerstandsfähigkeit zu erzielen. So könnte zum Beispiel ein langsamer Prozessschritt redundant ausgelegt werden.

Zugegebenermaßen bietet der heute verfügbare ActorMarterializer noch nicht solche Merkmale, aber das grundlegende Design – insbesondere die Trennung von Beschreibung und Ablauf – eröffnet solche Möglichkeiten, ebenso wie das Zusammenfassen von einzelnen Prozessschritten zu einem synchronen Teilprozess. Da Akka Streams noch jung ist, darf man davon ausgehen, dass kommende Versionen in diesem Bereich einige interessante Erweiterungen bringen werden.

Ein Aspekt, den wir bisher vernachlässigt haben, ist der Rückgabewert des Materialisierungsprozesses, also der Methode run. Dazu müssen wir wissen, dass jeder Baustein – also jede Source, jeder Flow und jede Sink – mit einem Typ für den sogenannten materialisierten Wert parametrisiert ist. Mit anderen Worten produziert jeder Baustein beim Materialisieren einen Wert eines bestimmten Typs. Ein paar Beispiele hierfür:

  • Source(List(1, 2, 3)) hat den Typ Source[Int, Unit], wobei der zweite Typparameter für den materialisierten Wert steht, hier also wenig interessant Unit.
  • Source.actorRef[A](...) hat den Typ Source[A, ActorRef], erzeugt beim Materialisieren einen Akka-Aktor und gibt die zugehörige ActorRef zurück, an die wir Nachrichten vom Typ A schicken können, welche dann in den Datenstrom emittiert werden.
  • Sink.foreach[A](...) hat den Typ Sink[A, Future[Unit]] und gibt beim Materialisieren eine Future zurück, anhand derer wir feststellen können, wann der Datenstrom beendet ist.

Wenn wir zwei Bausteine mit via oder to kombinieren, dann wird als Standard der linke materialisierte Wert verwendet. Wir können dieses Verhalten jedoch modifizieren, indem wir viaMat bzw. toMat anstelle von via bzw. to aufrufen und zusätzlich eine Funktion übergeben, welche beide materialiserte Werte als Argumente entgegennimmt und einen beliebigen neuen Wert zurückgibt, zum Beispiel den rechten oder ein Tupel aus beiden. Wenn wir statt to die Methode runWith verwenden, wird übrigens der rechte Wert zurückgegeben, sodass wir in obigem Beispiel eine Future[Unit] erhalten.

Graphen

In vielen Fällen reichen lineare Datenströme aus, aber manchmal benötigen wir komplexere Topologien. Dafür bietet uns Akka Streams die Möglichkeit, Datenströme als gerichtete Graphen zu beschreiben. Da lineare Datenströme nichts anderes sind als besonders einfache gerichtete Graphen, arbeiteten das API und die Implementierung von Akka Streams letztendlich nur mit Graphen.

Wir können hier nicht allzu sehr ins Detail gehen, sondern betrachten mit Broadcast und Merge zwei Bausteine, die in nichtlinearen Graphen besonderes oft verwendet werden. Bei Broadcast handelt es sich um einen Baustein, der einen Eingang hat und mehrere Ausgänge. Eingehende Datenelemente werden an alle Ausgänge emittiert, wenn all diese Bedarf signalisieren. Mit Broadcast können wir also einen Datenstrom in mehrere parallel laufende aufsplitten.

Offenbar brauchen wir ein Pendant, welches mehrere Datenströme zusammenführen kann. Ein solches liegt mit Merge vor, einem Baustein mit mehreren Eingängen und einem Ausgang. Sobald ein Eingang ein Datenelement verfügbar hat, wird dieses an den Ausgang emittiert, sofern dort Bedarf besteht.

In Abb.3 sehen wir ein Beispiel, in dem ein Datenstrom zunächst mittels Broadcast auf zwei Ströme aufgesplittet und anschließend mittels Merge wieder zusammengeführt wird. Da der Datenstrom einen Eingang und einen Ausgang besitzt, handelt es sich von außen betrachtet um einen einfachen linearen Flow, den wir folgendermaßen erzeugen können:

Flow() { implicit builder =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val flow01 = builder.add(Flow[Int].map(_ + 1))
val flow02 = builder.add(Flow[Int].map(_ * 2))
val merge = builder.add(Merge[Int](2))
broadcast.out(0) ~> flow01 ~> merge.in(0)
broadcast.out(1) ~> flow02 ~> merge.in(1)
(broadcast.in, merge.out)
}

Hier sehen wir, dass zur Definition eines Graphen ein veränderlicher FlowGraph.Builder als Parameter einer Konstruktionsfunktion zum Einsatz kommt. Darin werden mittels add sämtliche Bausteine zum Graphen hinzugefügt und anschließend deren sogenannte Inlets und Outlets mit dem Operator ~> auf geeignete Weise verbunden.

Ganz ähnlich lassen sich auch die anderen Bausteine für lineare Datenströme – also Sources bzw. Sinks – auf Basis komplexerer Topologien aus Graphen bilden, indem die Konstruktionsfunktion ein Outlet bzw. ein Inlet zurück gibt. Selbstverständlich können wir auch einen geschlossenen Graphen erstellen, der dann einen RunnableGraph darstellt und
daher ausgeführt werden kann.

Beispiel

Nun kennen wir also die Grundlagen von Akka Streams. Zur Vertiefung betrachten wir im Folgenden ein Beispiel, bei dem wir die Zahl pi mittels Monte-Carlo-Simulation [10] berechnen. Das könnten wir zwar mit Sicherheit nicht nur mit weniger Code, sondern auch effektiver ohne Akka Streams implementieren, aber es ist dennoch ein illustratives Beispiel. Der gesamte Beispiel-Code ist übrigens Open Source unter der Apache-Lizenz [11].

Die Grundidee, die wir umsetzen wollen, besteht darin, eine Vielzahl zufälliger Punkte in einem Quadrat der Seitenlänge eins zu erzeugen und anhand des Satzes von Pythagoras [12] zu unterscheiden, ob diese innerhalb oder außerhalb eines Viertelkreises, dessen Mittelpunkt auf eine Ecke des Quadrates fällt, liegen. Anhand der bekannten Formeln für den Flächeninhalt von Kreis und Quadrat, ergibt sich der Wert für pi damit als i/n*4 wobei i die Anzahl der Punkte innerhalb des Viertelkreises und n die Gesamtzahl der Punkte darstellt. Die komplette Umsetzung als ausgeführter Datenstrom sieht folgendermaßen aus:

Source(newRandomDoubleIterator _)
.grouped(2)
.map { case Seq(x, y) => (x, y) }
.via(toSample)
.scan(State(0, 0))(_.next(_))
.conflate(identity)(Keep.right)
.via(onePerSecond)
.map(state => f"After ${state.nrOfSamples}%,10d samples π is approximated as ${state.pi}%.6f")
.take(nrOfSteps)
.map(println)
.runWith(Sink.onComplete(_ => system.terminate()))

Wir beginnen den Datenstrom mit einer Source auf Basis eines Iterators für Werte vom Typ Double innerhalb von 0 und 1, den wir auf folgende Weise erzeugen:

def newRandomDoubleIterator() = new Iterator[Double] {
override def hasNext = true
override def next() = Random.nextDouble()
}

Anschließend fassen wir mit grouped(2) zwei Elemente zusammen und transformieren die resultierende Seq[Double] – von der wir wissen, dass sie genau zwei Elemente enthält – zu einem Wert vom Typ Point, einem Typ-Alias für Tuple2[Double, Double]:

type Point = (Double, Double)

Somit haben wir eine Source[Point], die wir durch via(toSample) mit einem Flow[Point, Sample, Unit] verbinden, also einem Flow, der einen Eingang für Points und einen Ausgang für Samples hat. Dabei stellt Sample einen Aufzählungstyp dar, der entweder Inside oder Outside sein kann:

sealed abstract class Sample
object Sample {
case object Inside extends Sample
case object Outside extends Sample
}

Den zuvor genannten Flow erzeugen wir auf folgende Weise, also ganz ähnlich, wie im obigen Beispiel zu Graphen:

def toSample = Flow() { implicit builder =>
import FlowGraph.Implicits._
val broadcast = builder.add(Broadcast[Point](2))
val collectInside = builder.add(Flow[Point].filter(isInside).map(_ => Sample.Inside))
val collectOutside = builder.add(Flow[Point].filter(isOutside).map(_ => Sample.Outside))
val merge = builder.add(Merge[Sample](2))
broadcast.out(0) ~> collectInside ~> merge.in(0)
broadcast.out(1) ~> collectOutside ~> merge.in(1)
(broadcast.in, merge.out)
}

Der Datenstrom wird mittels Broadcast auf zwei Ströme aufgesplittet. Einer nutzt filter(isInside), um nur diejenigen Points zu behalten, die innerhalb des Viertelkreises liegen, und transformiert diese anschließend zu einem Inside. Der andere ist analog implementiert, verwendet aber isOutside und transformiert zu einem Outside. Die verwendeten Methoden isInside und isOutside definieren wir unter Verwendung von Pythagoras so:

def isInside(point: Point): Boolean = {
val (x, y) = point
x * x + y * y < 1
}

def isOutside(point: Point): Boolean = !isInside(point)

Anschließend rufen wir .scan(State(0, 0))(_.next(_)) auf, um fortlaufend das aktuelle Ergebnis zu berechnen. scan funktioniert ähnlich wie das von den Scala-Collections bekannte foldLeft, wendet also eine binäre Operation auf ein Aggregat und das aktuelle Element an – hier funktiert die Klasse State als Aggregat und deren Methode next als binäre Operation:

case class State(nrOfInsideSamples: Long, nrOfSamples: Long) {
def pi: Double = nrOfInsideSamples.toDouble / nrOfSamples * 4

def next(sample: Sample): State = sample match {
case Sample.Inside => State(nrOfInsideSamples + 1, nrOfSamples + 1)
case Sample.Outside => State(nrOfInsideSamples, nrOfSamples + 1)
}
}

Offensichtlich hält State Werte für die Anzahl der innenliegenden Punkte und die Gesamtzahl vor, auf Basis derer mit pi der Wert von pi berechnet werden kann. Die Methode next aktualisiert den State geeignet, je nachdem, ob ein Inside oder Outside übergeben wird.

Als nächstes rufen wir conflate(identity)(Keep.right) auf, um dem bisherigen, stromaufwärts liegenden Teil des Datenstroms zu ermöglichen, schneller zu fließen, als der folgende, stromabwärts liegende Teil. Dazu wendet conflate – wieder analog zu foldLeft – die übergebene Funktion immer dann auf ein Aggregat und das aktuelle Datenelement an, wenn kein Bedarf vorliegt. Wir werfen mit Keep.right das Aggregat weg, behalten also nur den letzten State, weil dieser bereits alle nötigen Werte aggregiert hat.

Nun verbinden wir den bisherigen Teil des Datenstroms mittels via(onePerSecond) mit einem Flow[State, State, Unit], der höchstens mit der Geschwindigkeit von einem Element pro Sekunde fließt und den wir folgendermaßen erzeugen:

def onePerSecond = Flow() { implicit builder =>
import FlowGraph.Implicits._

val ticks = builder.add(Source(1.second, 1.second, ()))
val zip = builder
.add(ZipWith[State, Unit, State](Keep.left)
.withAttributes(Attributes.inputBuffer(1, 1)))
val dropOne = builder.add(Flow[State].drop(1))

ticks ~> zip.in1
zip.out ~> dropOne.inlet

(zip.in0, dropOne.outlet)
}

Wieder erstellen wir einen Flow aus einem Graphen, der mit dem Baustein ZipWith den Eingang für Elemente vom Typ State mit einer Source[Unit] synchronisiert, welche wir derart erzeugen, dass jede Sekunde ein Wert emittiert wird. Da ZipWith nur dann ein Tuple2 emittiert, wenn beide Eingänge jeweils ein Element verfügbar haben, drosseln wir auf diese Weise die Geschwindigkeit des resultierenden Flow auf höchstens ein Element pro Sekunde. Da Akka Streams zur Optimierung des Durchsatzes mit Puffern arbeitet, die vorhandene Elemente auf Vorrat abrufen, müssen wir hier den Eingangspuffer auf eins setzen und das einzige zu früh abgerufene Element mit drop(1) verwerfen.

Der Rest ist recht einfach: Erst transformieren wir State mit map(...) zu einer informativen Nachricht, die unter anderem den Wert von pi enthält. Dann limitieren wir den Datenstrom mit take(nrOfSteps) auf eine bestimmte Zahl von Elementen. Abschließend drucken wir die Nachrichten mittels map(println). Die damit vorliegende Source[Unit] verbinden wir mittels
runWith(...) mit einer Sink , welche bei Beenden des Datenstroms – was hier durch obiges take sichergestellt ist – das ActorSystem terminiert, und lassen den so erstellten RunnableGraph gleich laufen.

Wenn wir dieses Beispiel ausführen – beispielsweise in sbt –, dann erhalten wir folgendes Ergebnis, das natürlich aufgrund des simulativen Ansatzes nicht exakt reproduzierbar ist:

[demoAkkaStreamsPi]> run 5
[info] Running de.heikoseeberger.pi.PiApp 5
After 82,414 samples π is approximated as 3.128449
After 356,527 samples π is approximated as 3.138321
After 671,475 samples π is approximated as 3.139637
After 964,700 samples π is approximated as 3.139911
After 1,366,326 samples π is approximated as 3.140024
[success] Total time: 7 s ...

Offenbar erfüllt dieser Ansatz seinen fachlichen Zweck recht gut. Selbstverständlich haben wir bei den Erläuterungen einige Details weggelassen, hoffen aber dennoch, dass das Beispiel auch seinen didaktischen Zweck erfüllt und zumindest ein Grundverständnis von den Möglichkeiten vermittelt, die Akka Streams bietet.

Fazit und Ausblick

Die Betrachtung von Daten als Datenströme gewinnt in der Anwendungsentwicklung an Bedeutung – dabei geht es in erster Linie um deren Transformation. Akka Streams ist eine Bibliothek, die den Standard Reactive Streams implementiert und sowohl die Bearbeitung von linearen Datenströmen, als auch von komplexen Graphen ermöglicht. Dabei legt Akka
Streams großen Wert auf Zusammensetzbarkeit und Wiederverwendung.

Wie schon angedeutet, können wir damit rechnen, dass zukünftige Versionen von Akka Streams einige interessante Erweiterungen bieten werden. Insbesondere die Nutzung der Cluster-Fähigkeit von Akka sowie das Zusammenfassen von Einzelschritten zu einem Prozessschritt, innerhalb dessen die Verarbeitung lokal und synchron erfolgt, erscheinen heiße Kandidaten für eine der kommenden Versionen.

Da Akka Streams fester Bestandteil von Akka HTTP ist und die Verwendung auch für weitere Module – zum Beispiel für Akka Persistence – geplant ist, können wir sicher sein, dass das heute noch experimentelle Modul in Zukunft eine bedeutende Rolle innerhalb von Akka spielen wird. Und da Implementierungen von Reactive Streams miteinander kompatibel sind, sind sogar heterogene Systeme denkbar, in denen zum Beispiel Akka Streams zusammen mit Vert.x vorkommen. Die Zukunft von Akka Streams erscheint also hell und vielversprechend.

Autor

Heiko Seeberger

Heiko Seeberger ist Fellow bei codecentric und ein international anerkannter Experte für Scala und Akka. Er hat mehr als 20 Jahre Erfahrung in der Beratung und Softwareentwicklung. Er ist Autor des Buches Durchstarten mit Scala.
>> Weiterlesen
Buch des Autors:

botMessage_toctoc_comments_9210