Über unsMediaKontaktImpressum
Lukas Berle 15. Mai 2019

Skalierbare Daten-Pipelines mit Apache NiFi

Eine zentrale Herausforderung von datenlastigen Systemen ist der Umgang mit vielfältigen, heterogenen Datenquellen. Ein wichtiger Teil ist der Datentransport, also der Transfer von Daten z. B. aus relationale Datenbanksystemen, Kafka, FTP-Servern oder IoT-Geräten als Quellsysteme in andere Systeme, wie z.B. ElasticSearch, Hadoop oder AWS S3. Während man früher für eine solche Herausforderung eigene Java-Applikationen geschrieben hätte oder vielleicht sogar einen Enterprise-Service-Bus verwendet hätte, kommt heute immer mehr die Forderung nach hoch skalierbaren, einfach bedienbaren und vielseitigen Tools.

Eine Herausforderung in der Erstellung solcher Daten-Pipelines ist das Thema Ausfallsicherheit. Netzwerke können wegbrechen, aber auch Quell- oder Zielsysteme. Dabei muss sichergestellt sein, dass keine Daten verloren gehen. Bei der Weiterentwicklung von Daten-Pipelines erweist es sich außerdem als hilfreich, wenn man keine langen Deploymentschritte durchlaufen muss. Apache NiFi etabliert sich immer mehr und mehr als ein Allrounder für genau solche Anforderungen.

Apache NiFi ist ein skalierbares Framework zur Implementierung und Monitoring riesiger Datenströme. Ursprünglich von der NSA als NiagaraFiles entwickelt, wurde NiFi 2014 an die Apache Foundation gegeben und ist seit 2015 Apache Top-Level Projekt.

Was NiFi besonders angenehm zu bedienen macht: Es besitzt einer Benutzeroberfläche, über die sich per Drag-and-drop verschiedene Komponenten konfigurieren und miteinander verbinden lassen. Das Ergebnis ist also ein visuell fassbarer Data-Ingestions-Prozess. Ein weiterer zentraler Vorteil ist, dass man oft keinen Programmieraufwand hat. Man benötigt für die Entwicklung der Pipeline auch keinen Webserver und auch keine Container-Orchestration-Platform wie Openshift. Außerdem muss man sich nicht selbst um verschiedene Libraries kümmern, die in der Lage sind, mit den Quell- und Zielsystemen umzugehen – denn NiFi kommt mit einer Vielzahl an Prozessoren. Auch entfallen bei NiFi langwierige Deployment- und Kompilierprozesse – denn die Daten-Pipelines in NiFi sind on-the-fly anpassbar und die erstellten Pipelines sind darüber hinaus built-in skalierbar. Außerdem ist ein Prozess meist schneller fassbar als der sauberste Programmcode und kann auch von Fachabteilungen interpretiert werden.

Die Konzepte

NiFi baut grundsätzlich auf einigen wenigen, einfachen Grundkonzepten auf. Die wichtigsten davon sind FlowFiles, Prozessoren und Connections.

Das FlowFile ist eine Key-Value-Map, die zusätzlich einen Body, den Content, besitzt. Das FlowFile ist vergleichbar mit HTTP-Requests: Diese bestehen aus einem Header mit Meta-Informationen und oft einem Body, der den eigentlichen Inhalt abbildet. Bei NiFi ist jede Nachricht, also z. B. jede Zeile einer SQL-Tabelle oder jede Nachricht aus einem Kafka Topic ein sogenanntes FlowFile. Im FlowFile stehen z. B. Informationen über das Quell-System, während im Body der eigentliche Inhalt steht.

Prozessoren sind das eigentlich Herzstück von NiFi. Prozessoren binden Datenquellen und -senken an und können Nachrichten sowohl transformieren als auch filtern.

Eine weitere wichtige Komponente sind die Connections. Diese verbinden Prozessoren. Gleichzeitig erfüllen Connections eine wichtige weitere Funktion, nämlich das Queuing. Liefert ein Prozessor, der die Daten abholt, schneller neue FlowFiles als der darauf folgende Prozessor sie verarbeiten kann, puffern Connections diese so lange, bis der darauf folgende Prozessor dazu kommt, die FlowFiles zu verarbeiten.

Ein Beispiel

Der Zusammenhang dieser Komponenten soll im Folgenden anhand eines kurzen Beispiels illustriert werden. Unser Ziel ist es, Tweets von Twitter zu extrahieren und in ein Kafka-Topic zu schreiben. Jeder Tweet wird repräsentiert durch ein relativ umfangreiches JSON, das als eines von vielen Attributes den eigentlichen Tweet-Text enthält. Nur dieser soll in ein Kafka-Topic geschrieben werden [1].

Zunächst benötigen wir NiFi. NiFi lässt sich als Archiv herunterladen und kann einfach entpackt werden [2]. Es ist sowohl unter Windows als auch unter Linux-Systemen lauffähig. Zum Starten muss im bin-Ordner die nifi.sh ausgeführt werden, und die Implementierung der Pipeline kann beginnen.
Zunächst benötigen wir einen Prozessor der Daten von Twitter liest. NiFi bietet aktuell ca. 295 Prozessoren. Der Prozessor GetTwitter macht dabei genau das was wir wollen: Er verbindet sich zu Twitter, liest die aktuell getweeteten Nachrichten als JSON und packt diese in ein FlowFile. Zunächst ziehen wir diesen Prozessor in die Workspace. Daraufhin müssen wir in den Einstellungen des Prozessors unsere Twitter-API-Account-Daten hinterlegen. Diese bekommen wir kostenfrei bei Twitter.

Wie erwähnt ist "ein Tweet" nicht nur ein Tweet-Text, sondern ein relativ großes JSON, das neben dem Text auch zahlreiche weitere Daten enthält, wie Zeitstempel, Informationen über den Tweeter und so weiter. Uns interessiert dabei nur das JSON-Attribut text, welches den eigentlichen Tweet-Text enthält. Wir brauchen also einen weiteren Prozessor, der dieses JSON-Attribut extrahiert. Dazu nehmen wir den Prozessor EvaluateJsonPath und ziehen ihn in den Workspace. Die diversen Prozessoren von NiFi funktionieren alle unterschiedlich, so ist ein Blick in die entsprechende Dokumentation ratsam. In diesem Fall müssen wir unter den Einstellungen des Prozessors ein neues Property hinzufügen. Wir nennen es text und definieren mit dem JsonPath-Ausdruck $.text, dass wir lediglich das Attribut text aus dem JSON extrahieren wollen. Mit dem Umsetzen des Properties Destination auf flowfile-content definieren wir, dass der Content unserer Flow-Files durch den Tweet-Text ersetzt werden soll. Zuletzt benötigen wir noch einen Prozessor, der die FlowFiles in ein Kafka-Topic schreibt. Hierfür ziehen wir einen weiteren Prozessor PublishKafka in den Workspace und konfigurieren die Adresse des Kafka-Servers sowie das Topic. Wenn wir den Prozess nun starten, werden durchgehend Tweets in unser Kafka-Topic geschrieben. Unser Ergebnis sieht dann wie folgt aus:

Weitere Features

Eine wichtige Anforderung für jede produktionsreife Plattform ist die Möglichkeit, ein automatisiertes Deployment durchzuführen und eine Versionierung um letztendlich eine Continuous-Delivery-/Integration-Pipeline aufzubauen. Grundbaustein sind natürlich Tests. Wie und mit welcher Technologie-Tests geschrieben werden ist sehr einzelfallabhängig und kommt auf die genauen Tasks an, die die Pipelines erledigt und insbesondere auch auf die Art der Datenquellen und -senken. Generell bietet sich ähnlich wie bei Java-Anwendungen JUnit an.

Die Versionierung kann mit der NiFi Registry erfolgen. Dies ist ein zusätzlicher Dienst, der einfach installiert werden kann. Sie benötigt eine Persistenzschicht. Typischerweise konfiguriert man hier ein Git-Repository, in dem der NiFi Flow persistiert wird. Sobald man sein NiFi Cluster mit der NiFi Registry verbunden hat, kann man NiFi Flows einfach committen, zwischen Versionen springen, aber auch z. B. Änderungen reverten.

Eine weitere wichtige Eigenschaft von NiFi ist dessen Erweiterbarkeit. Hierbei ist das nützlichste Feature das Hinzufügen eigener Prozessoren. Hierfür lässt sich via Maven ein Skeleton generieren. Mit Hilfe des folgenden Befehls lässt sich nach Maven Archetypes für NiFi filtern:

mvn archetype:generate -Dfilter=org.apache:nifi

Nach der Wahl des konkreten Archetypes und einiger weiteren Abfragen wird ein neues Maven-Projekt erstellt. Dies enthält u.a. eine Java-Klasse mit Methoden, die nur noch auf Ihre Implementierung warten.

…
  @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement
    }
…

Nach dem Kompilieren des neuen Prozessors kann er in NiFi importiert und genutzt werden.

Fazit

Insgesamt lässt sich sagen, dass NiFi ein herausragendes vielfältiges Instrument darstellt um Datenströme zu steuern und Transformationen auf diesen auszuführen. Neben genannten Features bringt NiFi auch noch einige weitere Dinge mit, wie sehr ausgeklügelte, feingranulare Security Policies, die verschiedenen Nutzern verschiedene Berechtigungen auf unterschiedlichen Teile der Daten-Pipeline geben. Außerdem ist man durch sogenannte Process Groups sehr gut in der Lage, Teile des Daten-Flows zu schachteln und damit auch den Überblick über große Daten-Pipelines zu behalten. Auch stellte sich heraus, dass NiFi absolut Multi-Developer-fähig ist. NiFi kann also mit mehreren parallelen Entwicklern, die alle an derselben Pipeline arbeiten, sehr gut umgehen. Alles in allem ist zu erwarten, dass NiFi in den nächsten Jahren mit hoher Wahrscheinlichkeit noch deutlich an Bedeutung gewinnen wird. Der Funktionsumfang und die sehr gute Nutzerfreundlichkeit sollten für alle Data Engineers überzeugende Argumente sein, um sich NiFi auf jeden Fall mal anzusehen.

Autor

Lukas Berle

Lukas Berle beschäftigt sich als Big Data Software Engineering Lead bei OPITZ CONSULTING mit den Themen Big Data, verteilten Systemen und deren Architekturen.
>> Weiterlesen
Das könnte Sie auch interessieren
botMessage_toctoc_comments_9210