Über unsMediaKontaktImpressum
Jochen Mader 12. Januar 2015

Vert.x 3: Reactive Microservices

Der nicht mehr zu bremsende Node.js-Hype [1] der letzten Jahre ging auch an mir nicht spurlos vorbei. Allerdings kam ich nie über ein paar Experimente hinaus. Meine Vorbehalte gegenüber JavaScript auf dem Server waren zu groß. Wenn ich ehrlich bin, sind sie es immer noch. Dank eines Kollegen stolperte ich über Vert.x [2]. Was Anfangs aussah wie ein simpler Node.js-Klon offenbarte schnell eine Reihe von Ideen, die mich zu einer tiefergehenden Auseinandersetzung mit dem Framework bewegten. Worum genau es sich bei diesen Ideen handelt, was von Version 3, an der gerade gearbeitet wird, zu erwarten ist und warum ich eigentlich so ein Fan von Reactive Programming bin, möchte ich Ihnen in diesem Artikel etwas näher bringen.

Bitte beachten Sie: Um die Beispiele in diesem Artikel ausprobieren zu können, benötigen Sie Vert.x 3. Im Vert.x GitHub-Rep [3] findet sich die aktuelle Version und Anleitungen wie man es baut. Für die Rx-Beispiele benötigen Sie zusätzlich vertx-rx [4]. Das Skelett eines Basis-Projekts findet sich hier [5].

Der Code in diesem Artikel basiert auf einem Stand von Vert.x 3 der sich derzeit noch stark ändert. Es ist zwar schon eine gewisse Stabilität in den Basis-APIs eingekehrt, Änderungen können bis zum Release aber jederzeit passieren.

Buzzword Bingo

Bringen wir es hinter uns: Vert.x ist ein reaktives, modulares, nachrichtenorientiertes, polyglottes Framework, mit dem sich hochperformante Microservice-Architekturen aber auch "klassische" Anwendungen umsetzen lassen. Was hier versprochen dürfte bei Gartner-Hype-Cycle affinen Managern freudig gerötete Wangen hervorrufen, bei Entwicklern eher nervöse Zuckungen.

Was den BW-Bingo-Effekt zumindest für mich etwas abmilderte, war der Umstand, dass an Vert.x bereits seit 2011 gearbeitet wird und viele der Features bereits existierten oder absehbar waren. Das Framework kam also vor den Trends. Und bevor ich mich anhöre wie jemand, der Ihnen ein neues Enterprise-Snake-Oil verkaufen möchte, beginne ich damit, die einzelnen Begriffe und ihre Bedeutung für Vert.x genauer zu betrachten.

Reactive Programming

Vert.x gehört zur noch recht jungen Gattung der reaktiven Server Frameworks. Diese Bezeichnung wurde maßgeblich durch die Veröffentlichung des Reactive Manifesto [6] von Entwicklern aus dem Akka-Umfeld [7] geprägt. Am 22.07.2013 wurde hier begonnen, eine Liste von Eigenschaften zu definieren, die zukunftsfähige moderne Server Frameworks erfüllen sollten. Ich würde Ihnen empfehlen, das ganze Manifest zu lesen. Im Folgenden habe ich nur eine sehr subjektiv gekürzte Variante der Kernaussagen zusammengefasst:

  • Responsive: Das Ziel der Anwendung ist, konsistent niedrige Antwortzeiten zu erreichen. Dies erleichtert das Finden von Problemen. Der User wird es uns auch danken.
  • Resilient: Fehler in Einzelbereichen der Anwendung führen nicht zum Ausfall oder einer signifikanten Verschlechterung der Antwortzeiten. Fehler werden innerhalb der Komponenten behandelt und können keine anderen Komponenten mit in den Abgrund reißen.
  • Elastic: Das System bleibt unter wechselnder Last responsiv. Sharding und Replication sind kein Sonderfall sondern ein Kernbestandteil des Frameworks. Die Verfügbarkeit von Ressourcen kann dynamisch an die aktuelle Arbeitslast angepasst werden.
  • Message Driven: Das System setzt auf asynchrones Messaging zum Austausch von Informationen. Konzepte wie non-blocking Backpressure und die Verwendung von non-blocking IO verringern den System Overhead und erlauben eine optimale Ausnutzung vorhandener Ressourcen.

Die Punkte 1-3 gehören ja schon fast zum guten Ton und man findet sie in leicht abgewandelter Form in der Beschreibung aller Application Server und Frameworks. Richtig interessant ist Punkt 4. Der Hauptunterschied zwischen dem was das Manifest beschreibt und dem was viele aus der Java-Welt unter Message Driven verstehen, ist die Granularität. JMS und Co. wurden oft zur Entkopplung ganzer Systeme genutzt. Mehrere "klassische" JEE-Anwendungen würden z.B. RabbitMQ [8] verwenden um asynchron Nachrichten austauschen zu können.

Im Falle von Akka, Vert.x und Co. wird der asynchrone Austausch von Nachrichten innerhalb der Anwendung zum bestimmenden Merkmal. Statt Anwendungen die miteinander auf diesem Weg kommunizieren sind es hier die einzelnen Komponenten. Die Grundidee hinter dem Ganzen leitet sich  aus dem Actor Model [9] ab: "Aktoren sind nebenläufige Einheiten, die nicht über einen geteilten Speicherbereich verfügen, sondern ausschließlich über Nachrichten kommunizieren. Die Kapselung des Zustandes des Aktors ähnelt dem Prinzip der Kapselung in der objektorientierten Programmierung. Jeder Aktor verfügt über einen Posteingang, eine Adresse und ein Verhalten."

Wie schon erwähnt ist auch Vert.x eine Implementierung dieses Konzepts, worauf schon auf der vertx-Startseite [10] hingewiesen wird: "Concurrency: Simple actor-like concurrency model frees you from the pitfalls of traditional multi-threaded programming."

Aktoren werden in Vert.x als Verticle bezeichnet. Sie unterliegen einem Satz von Regeln, der sich aus dem Actor Model ableiten lassen:

  1. Ein Verticle wird immer nur von einem einzigen Thread ausgeführt.
  2. Es wird immer der gleiche Thread zur Ausführung verwendet.
  3. Blockierende Aufrufe sind grundsätzlich verboten.
  4. Es gibt kein Shared Memory, Daten werden ausschließlich über den eingebauten EventBus ausgetauscht.

Zum besseren Verständnis wird es Zeit für ein paar Code-Beispiele.

Verticles in Action

Im Folgenden möchte ich ein kleines Ping-Beispiel vorstellen, das die wichtigsten Aspekte des Anwendungsaufbaus in Vert.x abdeckt. Anfangen möchte ich mit einem Verticle, dessen Aufgabe es ist, Ping-Nachrichten über den EventBus zu verschicken und auf einen passende Antwort zu warten.

[source,java]
.PingVerticle.java

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
      
public class PingVerticle extends AbstractVerticle{
      
    private static final Logger log = LoggerFactory.getLogger(PingVerticle.class);
  
    @Override
    public void start() throws Exception {
        vertx.setTimer(1000, msg -> {
          vertx.eventBus().send("ping", new DeliveryOptions().
          setSendTimeout(500), reply -> {
              if (reply.succeeded()) {
                  log.info("Received respone: " + reply.result().body());
              } else {
                  log.info("Received no respone");
              }
          });
        });
                
        vertx.eventBus().publish("pingToLog", "ping");
    }
}

Zeile 13: Erzeuge einen Timer der alle 1000 ms ausgelöst wird und die entsprechende Aktion durchführt.
Zeile 14: Senden einer Nachricht an die Adresse "ping".
Zeile 15: In den DeliveryOptions wird festgelegt, dass maximal 500 ms auf eine Antwort gewartet wird.
Zeile 16: Bei Erfolg wird die Anwtort geloggt.
Zeile 19: Sollte in der angegebenen Zeit keine Antwort kommen loggen wir einen Fehler.
Zeile 24: Einmaliges Versenden via Publish an _alle_ "pingToLog"-Empfänger

Wo ein Sender ist, braucht es natürlich auch einen Empfänger. Das folgende PongVerticle reagiert auf die versendeten Nachrichten.

[source,java]
.PongVerticle.java

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
  
public class PongVerticle extends AbstractVerticle{
  
    private static final Logger log = LoggerFactory.getLogger(PongVerticle.class);
      
    private long counter = 0;
        
    @Override
    public void start() throws Exception {
        vertx.eventBus().consumer("ping").handler(msg -> {
          counter++;
            msg.reply("pong "+msg.body());
        });
          
        vertx.eventBus().consumer("pingToLog").handler(msg -> {
            log.info("Received a pingToLog. The counter value is "+counter);
        });
          
    }
}

Zeile 14: Registrieren eines Consumers unter der Adresse "ping".
Zeile 15: Counter inkrementieren.
Zeile 16: Anfrage mit einem "pong" beantworten.
Zeile 19: Registrieren eines Consumers unter der Adresse "pingToLog".
Zeile 20: Keine Antwort senden aber das Ergebnis ins Log schreiben.

Im Falle des PongVerticles stellt jeder Handler eine Adresse dar, unter der dieses zu erreichen ist. Da das Verticle nur einen Thread besitzt, werden eingehende Nachrichten nach dem FIFO-Prinzip abgearbeitet und Stück für Stück den Handlern zu übergeben. Somit sind Threading-Primitivas wie Locks, Atomics usw. unnötig, um den verwendeten Counter zu schützen. Alles Gute hat seinen Preis und der wurde in Punkt 3 dargelegt: Blockieren ist verboten. Auf den ersten Blick ist das nicht schlimm, schließlich entfällt die Notwendigkeit für klassische Synchronsiation.

Irgendwann wollen wir aber mit der Außenwelt Kontakt aufnehmen. Klassische I/O-Operationen haben aber den Nachteil zu blockieren. Ein Beispiel ist das Lesen aus einer Datei. Sobald man eine Datei öffnet, passiert folgendes: Zuerst signalisieren wir dem Betriebssystem unsere Absicht. Dieses redet nun mit dem Festplattencontroller, der seinerseits die Festplatte anfahren lässt. Nachdem der Lesekopf repositioniert wurde, fangen wir an zu lesen. Und die ganze Zeit hängt unser Thread in der Leseoperation und wartet, bis die ersten Bits herausfallen.

Auch Sockets zeigen hier ein ähnliches Verhalten. Wenn wir versuchen, Daten aus einem zu lesen wird der lesende Thread vom Start der Leseoperation bis zum Eintreffen der Daten vom Client blockiert. Im Falle von Vert.x würde bei der Verwendung blockierender Sockets somit ein einziger langsamer Client reichen, um den Server faktisch zum Stillstand zu bringen.

non-blocking I/O

Glücklicherweise bietet der JDK im Package _java.nio_ seit Version 1.4 alles um dieses Problem zu lösen. Hier wird auf Funktionen des Betriebssystems zurückgegriffen, die es erlauben, mit Dateien und Sockets zu arbeiten ohne dabei zu blockieren.

[soruce,java]
.NIOexample.java

Selector selector = null;
ServerSocketChannel server = null;
selector = Selector.open();
server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
  selector.select();
  for (Iterator i = selector.selectedKeys().terator(); i.hasNext();) {
    SelectionKey key = i.next();
    i.remove();
    if (key.isConnectable()) {
      ((SocketChannel)key.channel()).finishConnect();
    }
    if (key.isAcceptable()) {
      SocketChannel client = server.accept();
      client.configureBlocking(false);
      client.socket().setTcpNoDelay(true);
      client.register(selector, SelectionKey.OP_READ);
    }
    if (key.isReadable()) {
      ...
    }
  }
}

Zeile 7: Selector für eingehende Verbindungen registrieren.
Zeile 9: Dieser Aufruf blockiert bis _mindestens_ eine Verbindung bereit ist.
Zeile 10: Der Selector gibt 1 - n SelectionKeys zurück, die wir nun abarbeiten.
Zeile 13: Wird aufgerufen, wenn der Verbindungsaufbau abgeschlossen ist.
Zeile 16: Eine Verbindung wurde akzeptiert.
Zeile 20: Den Selector bei dem neu erzeugten SocketChannel registrieren um
Leseoperationen durchführen zu können.
Zeile 22: Lesen.

_selector.select_ liefert immer nur die Socketverbindungen aus denen ohne zu blockieren gelesen werden kann. Somit kann ein auf NIO aufbauender Server eine immense Anzahl von Ereignissen in einem einzigen Thread abarbeiten. Einen minimalen Nachteil bringt das Ganze dann doch mit sich: Die Latenz nimmt gegenüber dem One-Thread-Per-Request-Prinzip leicht zu. Mit hohen Requestzahlen verschwindet dieser vermeintliche Nachteil allerdings.

Es gibt mittlerweile eine ganze Reihe von Bibliotheken, die NIO für uns kapseln und die Benutzung deutlich vereinfachen:

Das Vert.x-Projekt hat sich zur Verwendung von Netty [14], einem der bekanntesten Vertreter [15] seiner Gattung, entschieden. Jedes Verticle hat die Möglichkeit, eigene Netty-basierte TCP-, UDP- und HTTP-Server zu erzeugen. Das folgende Beispiel zeigt, was du tun musst, um einen HTTP-Server zu erzeugen.

[source,java]
.HttpVerticle.java

import io.vertx.core.Future;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.lang.rxjava.AbstractVerticle;
import io.vertx.rxjava.core.http.HttpServer;
  
public class HttpVerticle extends AbstractVerticle {
  
    private static final Logger log = LoggerFactory.getLogger(HttpVerticle.class);
  
    @Override
    public void start(Future startFuture) throws Exception {
  
        HttpServer httpServer = vertx
                .createHttpServer(new HttpServerOptions().setPort(8080))
                .requestHandler(req -> {
                    req.response().end(req.path());
                });
    
        httpServer.listen(status -> {
            if (status.succeeded()) {
                startFuture.complete();
                return;
            }
            startFuture.fail(status.cause());
        });
    }
}

Zeile 13: Eine neue Variante der Start-Methode: Das Ende des Verticle-StartUps wird erst signalisiert, wenn entsprechende Methoden am startFuture aufgerufen werden.
Zeile 16: Konfiguration des HttpServers um ihn auf Localhost unter Port 8080 zu starten.
Zeile 18: Registrierung eines Handlers um eingehende Requests mit dem Request-Path zu beantworten.
Zeile 21: Starten des Servers und Registrierung eines Startup-Handlers.
Zeile 22: Dieser Codeblock signalisiert die erfolgreiche Durchhführung des Starts an Vert.x.
Zeile 26: Hier wird Vert.x im Fehlerfall benachrichtigt.

Auch hier zeigen sich die in den anderen Beispielen vorgestellten Entwicklunsgmuster. Der Server wird in einem Verticle gestartet und arbeitet ebenfalls mit dem entsprechend zugeordneten Thread. Requests werden sequentiell von den registrierten Handlern abgearbeitet. Natürlich bietet Netty auch entsprechende Client-Funktionalitäten, auf die ich im folgenden Abschnitt eingehen werde.

Unit-Testing

Bis hierhin haben wir schon einen ordentlichen Ritt hinter uns. Die Grundkonzepte sollten Ihnen mittlerweile klar sein. Das beste Konzept ist allerdings nur so viel Wert, wie seine Testbarkeit. Auch wenn Unit-Tests mittlerweile nicht mehr aus der professionellen Softwareentwicklung wegzudenken sind, bekommen manche Entwickler bei dem Gedanken an das Testen asynchroner Frameworks graue Haare. Das Vert.x-Team hat viel Wert darauf gelegt, dem Entwickler alle Werkzeuge an die Hand zu geben, um das Schreiben von Tests so schmerzlos wie möglich zu gestalten. Das folgende Beispiel zeigt nicht nur, wie man asynchron testet, sondern liefert auch gleich ein Beispiel für die Verwendung des HttpClients.

[source,java]
.HttpVerticleTest.java

import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.test.core.VertxTestBase;
import org.junit.Before;
import org.junit.Test;
    
public class HttpVerticleTest extends VertxTestBase {
    @Before
    public void setUpTest() {
        vertx.deployVerticle(HttpVerticle.class.getName());
        waitUntil(() -> vertx.deployments().size() == 1);
    }
    
    @Test
    public void testGetHttpResponse() {
        HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions());
        httpClient.request(HttpMethod.GET, 8080, "localhost", "/hallo", response -> {
            response.bodyHandler(body -> {
                    assertEquals("/hallo", body.getString(0,body.length()));
                    testComplete();
                }
            );
        }).end();
        await();
    }
}

Zeile 8: VertxTestBase bildet die Basis für alle Unit-Tests.
Zeile 11: Vor dem Test müssen die zu testenden Verticles erstmal gestartet werden.
Zeile 12 Das Starten passiert asynchron. Da wir, vorbildlicherweise, die _start(Future<Void> startFuture)_-Methode im HttpVerticle implementiert haben, wird der Test erst fortfahren wenn diese erfolgreich durchgeführt wurde.
Zeile 17: Erzeugen eines asynchronen HttpClients über die Vert.x-API.
Zeile 18: Der Request soll via GET auf Port 8080 und den Pfad _/hallo_ gehen.
Zeile 19: Ein Body-Handler wird ausgeführt, wenn der HTTP-Client die vollständige Antwort vom Server empfangen hat.
Zeile 20: Testen, ob das erwartete Ergebnis zurückgekommen ist.
Zeile 21: Signal an den Runnner, dass der Test beendet ist.
Zeile 24: Request abschicken.
Zeile 25: Warten auf das Ende des Tests.


Trotz der asynchronen Natur des Frameworks finde ich den Test gut lesbar. Sobald man sich an die lambda-Schreibweise und die Verwendung von Operationen wie _waitUntil_, _await_ und _testComplete_ gewöhnt hat, wird das Schreiben von Tests schnell zur zweiten Natur.

Tipps: JDBC und viele andere APIs sind blockierend und deshalb nicht ohne Weiteres in Vert.x verwendbar. Dank des Erfolgs reaktiver Frameworks gibt es aber mittlerweile für eine Vielzahl von Datenbanken entsprechende non-blocking Treiber. Um den passenden Treiber zu finden hilft ein Blick in die Vert.x Module Registry [16]. Für den Fall, dass blockierende Aufrufe nicht zu vermeiden sind, bietet Vert.x sogenannte WorkerVerticles. Für diese werden die Regeln der "echten" Verticles an einer entscheidenden Stelle aufgeweicht: Statt der Zuweisung eines festen Threads kann für jede Ausführung ein anderer Thread aus dem internen ThreadPool verwendet werden. Dabei bleibt die Regel, dass ein Verticle immer nur von genau einem Thread ausgeführt werden darf, bestehen. Diese Option sollte ausschließlich (!) dann verwendet werden, wenn es keinen anderen Weg gibt, weil damit das Tuning der Anwendung deutlich erschwert wird.

Skalierung über den EventBus

Wie der EventBus zu verwenden ist, dürfte jetzt klar sein. Bleibt noch die Frage nach der Skalierung. Mit blockierenden Aufrufen ist der größte Zeitfresser in unserer Anwendung eliminiert. Allerdings brauchen auch alle Operationen eine gewissen Rechenzeit, die sich nach und nach aufsummiert. Irgendwann kommt man an den Punkt, an dem ein einziges Verticle die Arbeit nicht mehr bewältigen kann. Hierfür erlaubt der EventBus, diese mehrfache zu deployen und führt automatisch ein Round Robin über diese durch. Sobald z.B. das PongVerticle 2 mal deployt ist, würde jede Instanz 50% der eingehenden Ping-Requests zur Verarbeitung bekommen. Der EventBus bietet allerdings noch deutlich mehr. Tatsächlich handelt es sich um einen Distributed EventBus. Durch das Aktivieren der Clusterfunktionalitäten [17], wird der EventBus per Multicast oder explizite Konfiguration automatisch mit allen anderen Vert.x-Instanzen im Netz verbunden. Via Hazelcast [18] tauschen diese nun untereinander Informationen zu den registrierten Handlern aus. Events werden nun automatisch zwischen den einzelnen Knoten ausgetauscht. Einfacher geht es kaum.

Neuerungen am EventBus

Bisher habe ich vor allem die Verwendung des EventBus beschrieben. Im Folgenden möchte ich einen kurzen Blick auf die Änderungen werfen, die unter der Haube passieren. Der EventBus wurde einer Generalüberholung unterzogen und erlaubt nun deutlich tiefere Eingriffe in seine Funktionsweise. Da wäre zum Einen die Möglichkeit, den Serialisierungsmechanismus auszutauschen. Durch das Registrieren eines eigenen _MessageCodecs_ via _vertx.eventBus().registerCodec()_, lässt sich die JSON-Serialisierung z.B. durch Kryo [19] oder ProtoBuf [20] ersetzen. Zusätzlich kann man den Events nun auch weitere Metadaten in Form von Headern mitgeben. So könnte man z.B. Hash-Informationen in die Header verpacken:
_vertx.eventBus().send("ping", "pingnachricht", new DeliveryOptions().addHeader("md5","ea28a3b11ba38e21691f6e258ae5f5c1"))_
Ein weiteres wichtiges Feature ist die Möglichkeit, lokale Consumer zu erzeugen:
_vertx.eventBus().localConsumer("localPing")_
Dieser wird nicht im Cluster bekannt gemacht. Somit kann gezielt gesteuert werden, welche Nachrichten im Netzwerk verteilt oder lokal verarbeitet werden sollen. Dies ist vor allem im Zusammenspiel mit den im nächsten Abschnitt beschriebenen Streams wichtig, da so eine Verarbeitung im lokalen Knoten erzwungen wird.

RX Vert.x

Neben den technischen Änderungen am EventBus selbst, hat sich auch einiges beim Umgang mit den Events geändert. Wer sich etwas länger mit Vert.x beschäftigt, stellt mit der Zeit fest, dass man auch hier irgendwann in der Callback-Hölle landet. Vor allem komplexe Aggregation oder Filteroperation führen schnell zu einem unübersichtlichen Gewirr von Handlern. In solchen Fällen fühlt sich das Arbeiten mit einzelnen Events unpassend an. Glücklicherweise hat man sich zu diesem Thema schon vor einiger Zeit in einer anderen Ecke der Softwareentwicklungswelt Gedanken gemacht. Die Rede ist von Rx (Reactive Extensions) [21], einem Konzept, das von der .NET-Fraktion schon etwas länger verwendet wird. Statt der Handler werden hier sogenannte Observables verwendet, die als Quellen von Event-Streams dienen. Die Reactive Extensions bieten ein umfangreichen Satz an Möglichkeiten mit einem solchen Datenstrom umzugehen und ihn zu manipulieren. Bereits in Vert.x 2 gab es eine Rx-Implementierung unter dem Namen RxJava [22] aufsetzende mod-rxvertx [23]. Von dieser inspiriert wurde eine neue Variante entwickelt, die mittlerweile Bestandteil des Core-Frameworks ist. Somit lassen sich z.B. Map/Reduce-Operationen sehr einfach formulieren.
[source.java]
.MapReduceVerticle.java

import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.lang.rxjava.AbstractVerticle;
import rx.Observable;
import java.util.stream.Collectors;
  
public class MapReduceVerticle extends AbstractVerticle {
  
    private static final Logger log = LoggerFactory.getLogger(MapReduceVerticle.class);
  
    @Override
    public void start() throws Exception {
        Observable obs = vertx.eventBus().consumer("countCharacters").bodyStream().toObservable();
        obs
            .buffer(10)
            .map(samples -> samples.stream().
                    collect(Collectors.averagingInt(d -> d.length())))
            .subscribe(avLength -> {
                log.info("Durchschnittliche String-Länge: " + avLength);
            });
    }
}
Zeile 3: Importieren der Rx-ified AbstractVerticle-Variante.
Zeile 13: Erzeugen eines Streams aus einem Consumer der an der Adresse "countCharacters" lauscht.
Zeile 15: Jeweils 10 Nachrichten sammeln und diese als Block in die weitere Verarbeitung leiten.
Zeile 17: Die Länge eines Strings auf den String mappen und einen Collector übergeben der die durchschnittliche String-Länge ermittelt.
Zeile 18: Die Ergebnisse empfangen und in die Logs schreiben.


Die Reactive Extensions sind für eine Vielzahl von Sprachen und Frameworks [24] auf der JVM verfügbar und erfreuen sich großer Beliebtheit. Ihre einfache Lesbarkeit und Eleganz sind in meinen Augen derzeit noch unübertroffen.

Reactive Streams

Seit kurzem darf sich Vert.x auch zum Club der Frameworks mit Reactive Streams [25] zählen. Diese Spezifikation definiert einen Satz an Schnittstellen zum asynchronen Austausch von Event Streams zwischen verschiedenen Bibliotheken und Frameworks. Ziel war es, vor allem ein gemeinsames Verständnis für Flow Control durch non-blocking Back Pressure zu etablieren. So sollte ein überlasteter Consumer dies dem Producer asynchron und ohne Locking Overhead mitteilen können. Durch die Implementierung dieser APIs ist z.B. das Einbetten von Akka in Vert.x oder die gemeinsame Nutzung von Bibliotheken möglich. Wie sich das Ganze auf das reaktive Ökosystem auswirkt, wird sich bald zeigen. Zumindest wurden Reactive Streams mittlerweile von allen bekannteren Vertretern umgesetzt und haben das Backing von Pivotal, Typesafe, Oracle, Netflix, Twitter, ...

Module

Nachdem ich viel über das Schreiben von Verticles erzählt habe, möchte ich jetzt endlich darauf eingehen, wie man eine Anwendung paketiert. Anwendungsbestandteile werden in Vert.x als Module ausgeliefert. Gegenüber Vert.x 2 hat sich ihr Aufbau allerdings gravierend geändert. Bisher wurde ein stark an OSGi [26] angelehntes System eingesetzt, aber die Modularisierung wurde mittlerweile fast vollständig entfernt. Wo zuvor eine starke, auf Classloadern basierende Trennung eingebundener Module existierte, setzt man nun auf das Buildsystem. Einzig die Classloader-Trennung der Verticles ist noch optional verfügbar. Zuerst möchte ich sagen, dass ich ein großer Freund der OSGi-artigen Modularisierung bin, insbesondere da die JVM bis heute keine vernünftige Modularisierung beherrscht und wir uns immer noch mit der "Sea of Jars" herumschlagen. Auf der anderen Seite habe ich beim produktiven Einsatz von Vert.x die Erfahrung gemacht, dass ich das Modulsystem nicht wirklich brauche, da wir Vert.x-Anwendungen grundsätzlich als Fat-Modules ausliefern. Hier schlagen zwei Herzen in meiner Brust und ich habe noch keine finale Meinung dazu. Egal wie man dazu stehen mag, das Bauen einer Vert.x-Anwendung wird damit deutlich einfacher. Statt wie früher Dependencies, sowohl in der _module.conf_ als auch im Build-System pflegen zu müssen, befindet sich nun alles an einer Stelle.

3,2,1,0 ... Take of

Bleibt noch das Starten einer Vert.x-Anwendung. Um die Beispiele aus diesem Artikel in Aktion zu sehen, würde ich zuerst das eingangs erwähnte Beispielprojekt [27] ausprobieren. Nachdem Sie das Code-Beispiel in das Projekt übertragen haben, wird es Zeit, das sogenannte Main-Verticle zu schreiben. Dieses wird von Vert.x für den Bootstrap der Anwendung verwendet.
[source:java]
.MainVerticle.java

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.lang.rxjava.AbstractVerticle;
  
public class MainVerticle extends AbstractVerticle {
    @Override
    public void start(Future startFuture) throws Exception {
        vertx.deployVerticle(HttpVerticle.class.getName(), new DeploymentOptions().setIsolationGroup("myGroup"));
        vertx.deployVerticle(PingVerticle.class.getName(), new DeploymentOptions().setInstances(2));
        vertx.deployVerticle(PongVerticle.class.getName());
    }
}

Zeile 8: Das HttpVerticle wird mit der erwähnten Classloader-Isolation deployt.
Zeile 9: Von dem PingVerticle werden zwei Instanzen gestartet.
Zeile 10: Das PongVerticle wird ohne besondere Optionen gestartet.


In _build.gradle_ muss in der Zeile _attributes 'Main-Verticle':

'java:io.vertx.example.HelloWorldServer'_

der Klassenname durch den des MainVerticles ersetzt werden. Danach noch _./gradlew shadowJar_ ausführen und Sie können die Anwendung mit

_java -jar build/libs/simplest-proj-1.0-SNAPSHOT-fat.jar_

starten.

Polyglott

Ein Thema hätte ich dann doch noch. Im Artikel habe ich auf Beispiele in anderen Sprachen verzichtet, da hier noch heftig geschraubt wird. Die JVM ist mittlerweile die Heimat einer Vielzahl verschiedener Sprachen. Jede dieser Sprachen wartet mit einer Vielzahl verschiedener Konzepte auf. Von funktional über imperativ bis statisch oder dynamisch typisiert. Grundsätzlich ist es fast immer möglich, von einer dieser Sprachen aus eine Java-Bibliothek zu nutzen. Allerdings bieten praktisch alle Sprachen Features, die nicht ohne weiteres in Java abbildbar sind. An solchen Stellen müssen dann oft hässliche Kompromisse gemacht werden, die so manchen Entwickler an den Rand der Tränen bringen.

Um diesen entgegen zu wirken, wurde im Vert.x-Projekt entschieden, Möglichkeiten zu schaffen, die Vert.x-API in verschiedenen Sprach-Versionen zur Verfügung zu stellen. Gegenüber Vert.x 2 wurde das Schreiben solcher Sprach-Bindings durch den Umstieg auf einen generativen Ansatz deutlich vereinfacht. Hierfür wurde die Neuimplementierung aller Sprach-Module notwendig. Somit wird es noch eine Weile dauern, bis hier der gleiche Stand erreicht ist wie in Version 2. Allerdings zeigt die hohe Geschwindigkeit in der derzeit portiert wird, dass dieser Weg wohl der richtige war. Mit Stand heute finden sich im Vert.x-Repo Sprachmodule in unterschiedlichem Fertigstellungsgrad für: JavaScript, Groovy, Python, Scala und Ceylon. Clojure und Ruby dürften auch bald folgen.

Schluss, Ende, Aus

Ich könnte noch weiter und weiter in die Tiefen von Vert.x 3 vordringen (Dockerintegration, JCA, ...) aber irgendwann muss auch Schluß sein. Die Entwicklung hat unheimlich Fahrt aufgenommen und jede Woche kommen neue Module hinzu, die die Funktionalität mehr komplettieren. Meine persönlichen Highlights sind die Integration von Reactive Streams und Reactive Extensions gepaart mit den Umbauten am EventBus. Ich persönlich kann den Release von Version 3 kaum noch erwarten und ich hoffe Sie haben auch Lust bekommen, das Ganze mal anzutesten.
Autor

Jochen Mader

Jochen Mader ist Lead IT Consultant bei der codecentric AG in den Bereichen Big Data und Agile Software Factory. Derzeit beschäftigt er sich hauptsächlich mit Reactive Programming und Stream Processing.
>> Weiterlesen
Kommentare (0)

Neuen Kommentar schreiben