Wie mit Apache NiFi Joins über unterschiedliche Datenquellen möglich werden
Apache NiFi wird bereits in vielen Unternehmen eingesetzt, um Daten automatisiert zwischen verschiedenen Systemen zu transferieren. Ursprünglich kommt es aus dem Big-Data-Bereich und wird häufig zur Beladung eines "Big Data Lake" genutzt.
Es müssen aber nicht immer Big-Data-Umgebungen sein. Apache NiFi kann auch bei kleineren Use Cases sinnvoll sein. Daher spielt das Open-Source-Tool in immer mehr Unternehmen eine wichtige Rolle. Hauptsächlich liegt dies an der Vielzahl der Anbindungsmöglichkeiten. Es können alle gängigen Datenbanksysteme, sowie die üblichen Big-Data-Services, aber auch Cloud-Dienste angebunden werden. Dazu werden keine tieferen Programmierkenntnisse benötigt und die Web-UI ermöglicht per Drag-and-drop die Erstellung eines sogenannten Dataflows. Wie damit Joins über verschiedenste Datenbanksysteme und andere Datenquellen erstellt werden können, wollen wir in diesem Blog zeigen.
Oftmals sind nicht alle benötigten Daten direkt in einer Datenbank verfügbar, sondern über unterschiedliche Systeme verteilt. Dennoch möchte man in bestimmten Fällen solche heterogen verteilten Daten zusammenführen und auswerten. Sind die Datensätze auf mehreren Tabellen in einer Datenbank aufgeteilt, können diese durch eine Join-Abfrage zusammengeführt werden. Das ist eine lange bekannte Funktion und für die meisten Leser wohl nichts Neues.
Die zusammengehörenden Datensätze werden dabei meist anhand eines Schlüssels (Primär- und Fremdschlüssel) oder "Keys" identifiziert und kombiniert. Diese Schlüssel (PK & FK) müssen bei allen abzufragenden Tabellen gleich sein. Nur dann werden logisch zusammengehörige Datensätze zurückgegeben. Das gleiche Prinzip wird bei Apache NiFi’s Lookup-Prozessor genutzt.
Von Anfang an…
Bevor wir uns die Details des Lookup-Prozessors anschauen, werden wir die Grundlagen von Apache NiFi erklären. Es gibt vier wichtige Begriffe, die bekannt sein müssen:
- FlowFile: Bei einem FlowFile handelt es sich im Prinzip um die Daten. Sie bewegen sich in der NiFi UI und können dort analysiert werden. Werden neue Daten gelesen, wird ein entsprechendes FlowFile erstellt. Ist dieses am Ziel angekommen und die Daten wieder geschrieben (z. B. in einer Datenbank oder einer Datei), so wird das FlowFile gelöscht. FlowFiles besitzen Attribute. Dabei handelt es sich um Key-Value-Paare, die im Memory liegen und den Inhalt (Content), welcher nicht im Memory gespeichert wird. Beim Content handelt es sich um den Inhalt, während Attribute lediglich Metadaten repräsentieren.
- Prozessor: Prozessoren agieren als "Black Boxes" in NiFi und erfüllen meist genau eine Aufgabe. Der SplitJson-Prozessor teilt bspw. ein JSON-Array auf. Prozessoren können auf die Attribute und den Content eines FlowFiles zugreifen und beide bearbeiten. In der aktuelle NiFi Version 1.17.0 gibt es mehr als 300 Prozessoren für die unterschiedlichsten Aufgaben.
- Queue/Connection: Ein Dataflow entsteht, wenn mehrere Prozessoren miteinander verknüpft werden. Das geschieht über Connections, bzw. Queues. Diese dienen gleichzeitig als Warteschlangen, falls ein Prozessor langsamer ist und die ankommenden FlowFiles nicht schnell genug bearbeiten kann.
- Controller-Service: Controller-Services agieren als Deamons im Hintergrund und stellen dabei eine bestimme Ressource bereit, die dann von den anderen Prozessoren oder Controller-Services genutzt werden kann. Ein Datenbank-Controller-Service, wie der DBCPConnectionPool-Service, stellt beispielsweise eine Datenbankverbindung bereit. Diese wird an einer zentralen Stelle konfiguriert und mehrere Prozessoren können darauf zugreifen. Insgesamt gibt es über 100 verschiedene Controller-Services, die Cloudverbindungen, E-Mail-Funktionalitäten, Datenbankverbindungen und viele weitere Funktionen bereitstellen.
Wohin soll es gehen?
Wir haben bereits erwähnt, dass wir Datensätze über verschiedene Datenbankmanagementsysteme anreichern wollen. Konkret kommen Quelldaten aus einer REST-API, die dann mit Datensätzen aus einer MySQL- und einer MongoDB-Datenbank angereichert werden. Sind die Daten angereichert, werden die benötigten Felder gefiltert und können weiterverarbeitet werden.
Wir haben den Use Case für ein besseres Verständnis vereinfacht dargestellt.
Die Datensätze aus der REST-API kommen als JSON-Daten in NiFi an und enthalten jeweils den Namen, den Vornamen, die Büro-ID und die ID einer Person. In der MySQL-Datenbank befinden sich Informationen zu vorhandenen Büros. Die Tabelle enthält vier Spalten: Büro-ID, Straße, Stadt und Postleitzahl. In der MongoDB befinden sich wiederum Daten über die jeweiligen Büroleiter:innen. Dort liegen JSON-Datensätze mit Namen, Vornamen und Büro-ID.
Aus den angereicherten Daten sollen am Ende der Name und Vorname der Person, die Stadt des Büros und der Name und Vorname des jeweiligen Büroleiters für die Weiterverarbeitung generiert werden. Das bedeutet, wir reichern zunächst die Quelldaten an und filtern danach nicht benötigte Felder heraus.
Zunächst starten wir mit der Abfrage der REST-API, unseren Quelldaten. HTTP-Requests können wir in NiFi mit dem InvokeHTTP-Prozessor darstellen (1). Hierfür konfigurieren wir in dem Prozessor lediglich die HTTP-URL passend zu unserer REST-API.
Die Daten sehen wie folgt aus:
{
"id": 403,
"name": "Kilian",
"office_id": 2,
"prename": "Thankmar"
}
Wir sehen die bereits beschriebenen Ausgangsdaten.
Im Gegensatz zum ersten Schritt wird es nun deutlich interessanter: Der LookupRecord-Prozessor (2) kommt ins Spiel. Er soll unseren vorhandenen Datensatz nun mit weiteren Informationen aus einer MySQL-Datenbank anreichern.
Der Lookup-Prozessor ermöglicht es uns, Daten aus weiteren Tabellen oder Dokumenten mit unseren vorhandenen Daten aus dem vorigen Prozessor zu vereinen. Dies kann man sich ähnlich vorstellen wie ein "Join" in relationalen Datenbanken, nur eben über beliebige kompatible Datensysteme. Dabei kann es sich um eine weitere Datenbank, aber auch Dateien oder eine REST-API handeln.
Zunächst geben wir einen Record Reader und einen Record Writer an. Diese geben an, in welchem Format gelesen und geschrieben werden sollen. Da wir JSON-Daten aus der REST-API bekommen, ist hier ein JsonTreeReader ausgewählt. Wir wollen die Daten auch direkt wieder in JSON schreiben, daher ist als Record Writer der JsonRecordSetWriter gewählt. Bei beiden handelt es sich um Controller-Services, die, wie wir sehen werden, auch von weiteren Prozessoren genutzt werden.
Es gibt insgesamt 13 verschiedene Record Reader und Writer. Dabei werden übliche Formate wie JSON, Avro und CSV genauso unterstützt wie Grok oder eigene Formate. Reader und Writer können selbstverständlich unterschiedlich sein! Bei der dritten Property – dem Lookup-Service – handelt es sich wieder um einen Controller-Service. Hierbei wird definiert, wo nach weiteren Daten gesucht werden soll. In unserem Fall ist der DatabaseRecordLookupService gewählt, da die Daten in einer MySQL-Datenbank liegen. Es werden mehr als 20 verschiedene Lookup-Services unterstützt. Dabei kann auf eine REST-API, HBase, ElasticSearch, CSV und viele weitere Datenquellen zugegriffen werden.
Die letzten beiden spannenden Properties sind Result Record Path und key. Die erste gibt an, unter welchem JSON-Attribut die neuen Daten gespeichert werden sollen und die zweite Property key definiert, was das Schlüssel-Attribut aus den JSON-Daten ist. In diesem Beispiel ist es das Attribut office_id. Im angegebenen DatabaseRecordLookupService geben wir mit dem MySQL-LookUp-Controller-Service eine Datenbankverbindung an. Die Property Lookup Key Column legt die Spalte in der Datenbanktabelle fest, anhand derer die Join-Bedingung durchgeführt wird. Der Wert aus dem Attribut, welches als key festgelegt wurde, wird mit Werten in dieser Spalte verglichen. Sind die Werte gleich, wird der Datensatz zurückgegeben. Welche Spalten zurückgegeben werden sollen, kann in der Property Lookup Value Columns festgelegt werden.
Wenn wir den ersten LookupRecord-Prozessor starten, kommen folgende Daten zurück:
{
"id" : 403,
"name" : "Kilian",
"office_id" : 2,
"prename" : "Thankmar",
"office_details" : {
"city" : "Wiesbaden",
"street" : "Kreuzberger Ring 13",
"zip_code" : 65205
}
}
Wir sehen, dass zu unseren Quelldaten einige neue Attribute hinzugefügt wurden. Sie sind unter dem JSON-Attribut office_details eingruppiert, so wie es im Prozessor konfiguriert wurde. Nach dem gleichen Prinzip und mit fast identischer Konfiguration läuft auch der zweite LookupRecord-Prozessor (3) ab. Dieser führt den Join jedoch gegenüber einer MongoDB aus, aber erneut mit dem key office_id. Final sieht der Datensatz nun wie folgt aus:
{
"id" : 403,
"name" : "Kilian",
"office_id" : 2,
"prename" : "Thankmar",
"office_details" : {
"city" : "Wiesbaden",
"street" : "Kreuzberger Ring 13",
"zip_code" : 65205
},
"manager" : {
"_id" : "62e93b5de5e1940007f1882f",
"name" : "Jung",
"prename" : "Matthias",
"office_id" : 2
}
}
Die Quelldaten aus der REST-API wurde erfolgreich um die Informationen des Büros und des jeweiligen Leiters ergänzt.
Ausdünnen
Im letzten Schritt werden wir die angewachsene JSON-Struktur auf die benötigten Felder reduzieren. Dafür nutzen wir den QueryRecord-Prozessor (4). Mit diesem können SQL-Statements gegen "beliebige" (der passende Record Reader/ Record Writer muss existieren) Datenstrukturen ausgeführt werden. Wir konfigurieren zunächst die beiden bereits vorhandenen JSON-Record Reader und Writer.
Als nächstes geben wir die SQL-Abfrage als Query an:
SELECT name, prename, RPATH_STRING(office_details, '/city') as city, RPATH_STRING(manager, '/name') as manager_name, RPATH_STRING(manager, '/prename') as manager_prename FROM FLOWFILE
Interessant ist, dass als Tabelle das FLOWFILE genutzt werden muss. Die zu selektierenden "Spalten" können einfach über den Namen angesprochen werden, wenn sie auf der obersten Ebene liegen. Da unsere Struktur verschachtelt ist, muss teilweise auf die Funktion RPATH zurückgegriffen werden. Als Ergebnis erhalten wir den finalen Datensatz, der nun weiterverarbeitet werden kann.
{
"name" : "Kilian",
"prename" : "Thankmar",
"city" : "Wiesbaden",
"manager_name" : "Jung",
"manager_prename" : "Matthias"
}
War's das schon?
Mit diesem kleinen Dataflow konnten wir bereits viele Funktionen von Apache NiFi demonstrieren. Wir haben gesehen, wie Daten über verschiedenste Systeme in unterschiedlichen Formaten zusammengeführt werden können, wie HTTP-Request initiiert werden und wie wir SQL-Abfragen gegen JSON und andere Datenstrukturen tätigen können.