Integration von Big Data und relationaler Datenbankwelt
Für analytische Anwendungen haben sich neben den traditionellen Ansätzen mit relationalen Datenbanken in den letzten Jahren neue sogenannte Big Data-Technologien etabliert. Bei diesen Technologien wird unterschieden, ob die Daten schon analysiert werden, bevor sie auf persistenten Speicher abgelegt werden (data in motion) oder ob sie erst nach der Speicherung untersucht werden (data at rest). Ersteres wird dann gemacht, wenn die Datenmenge vor der Speicherung aus Kapazitätsgründen zuerst verdichtet werden soll oder wenn es notwendig ist, die Ergebnisse der Analysen der Daten sofort nach ihrer Erzeugung zu haben und man es sich aus Antwortzeitgründen nicht leisten kann, die Daten vorher zu speichern. Im Folgenden soll aber der Fokus auf Technologien für data at rest gelegt werden. Bei diesen Big Data-Technologien wird aus Skalierbarkeitsgründen meist ein verteilter Ansatz gewählt und die Speicherung und Verarbeitung von semi-strukturierten oder unstrukturierten Daten wird meist besser unterstützt als bei traditionellen relationalen Systemen durch BLOBs (Binary Large Objects).
Damit stellt sich natürlich die Frage, ob Programme, Daten und Methoden aus der relationalen Welt auch für und mit der Big Data-Welt genutzt werden können. Das soll in diesem Artikel untersucht werden.
Ein verbreiteter Big Data-Ansatz basiert auf den Open Source-Produkten des Apache Hadoop-Framework [1]. Die Daten werden dabei häufig in dem Hadoop Distributed File System (HDFSTM) gespeichert, einem verteilten Dateisystem, das neben der verteilten Speicherung zur Performance-Steigerung auch die redundante Speicherung zur Erhöhung der Verfügbarkeit unterstützt. Aber auch andere parallele und verteilte Dateisysteme wie etwa IBM Spectrum Scale (auch als GPFS bekannt) können benutzt werden.
Das ursprüngliche Framework für verteilte Berechnungen und Auswertungen im Hadoop-Umfeld war MapReduce [2]. Dabei erfolgte die Verarbeitung in zwei Phasen, zunächst der Map-Phase und dann der Reduce-Phase. Dazu werden in einem ersten Schritt die Eingangsdaten in Teilpakete zerteilt, für die jeweils unabhängig voneinander und vollständig parallel die Verarbeitung der Map-Phase durchgeführt werden kann. Das Ergebnis der Map-Phase wird dann sortiert und als Eingabe für die Verarbeitung in der Reduce-Phase benutzt. Die Eingabe jeder Phase wird aus dem Dateisystem gelesen und das Ergebnis jeder Phase wird wieder in das Dateisystem geschrieben. Das zeigt auch den Nachteil dieses Ansatzes, nämlich, dass sehr viel IO erfolgt. Der Vorteil des MapReduce-Ansatzes und der Grund für seinen initialen Erfolg war die einfache Parallelisierung von verschiedenen Aufgaben. Das Spark-Framework versucht den Nachteil des MapReduce-Ansatzes zu vermeiden, dass Zwischenergebnisse der einzelnen Verarbeitungsschritte auf persistenten Speicher abgelegt werden [3]. Für Spark gibt es Bibliotheken, um Advanced Analytics-Probleme wie Machine Learning effizient zu lösen.
Gleichzeitig haben sich auch relationale Datenbankprodukte weiterentwickelt, um Advanced Analytics-Aufgaben performant angehen zu können. So besteht in Db2 Warehouse (auch bekannt als dashDB) [4], einer relationalen Datenbank optimiert für analytische Zwecke, die Möglichkeit, Machine Learning-Algorithmen direkt in der Datenbank als vordefinierte Stored Procedures auszuführen. Dies bietet den Vorteil, dass die Algorithmen direkt bei den Daten ausgeführt werden können, ohne dass vorher groß Daten bewegt werden müssen. Es gibt auch die Möglichkeit, diese Stored Procedures aus Programmiersprachen wie R zu benutzen.
Ein Standard, der sowohl in der relationalen Welt als auch in der Hadoop-Welt zum Zugriff und zur Abfrage auf Daten genutzt wird, ist SQL. Es bietet sich damit als gemeinsame Schnittstelle für Zugriffe auf beide Welten an. Spark als Framework, das sowohl die effiziente Verarbeitung von strukturierten Daten als auch wesentlich einfacher (nämlich durch Erstellen geeigneten Codes in einer der unterstützten Programmiersprachen wie etwa Scala oder Python) auch die Verarbeitung von semistrukturierten und unstrukturierten Daten erlaubt, bietet sich als zweite Möglichkeit der Integration an.
Ein Produkt, das es in der Hadoop-Welt schon sehr lange gibt, um mit einer SQL-artigen Sprache – HiveQL – auf in HDFS gespeicherte Daten zuzugreifen und diese zu analysieren ist Hive [5]. Hive ist aber bzgl. des SQL-Sprachumfangs beschränkt im Vergleich zu bei Data Warehousing-Projekten üblicherweise eingesetzten relationalen Datenbanksystemen wie Db2. Deshalb gibt es mit Big SQL von IBM ein Produkt, das den vollen Sprachumfang wie Db2 hat und es aber gleichzeitig ermöglicht, dieses SQL auch auf in Hadoop liegende Daten anzuwenden [6]. Möchte man mit Big SQL eine Tabelle auf HDFS anlegen, so benutzt man wie unten zu sehen das CREATE HADOOP TABLE-Statement:
create hadoop table users ( id int not null primary key, office_id int null, fname varchar(30) not null, lname varchar(30) not null) row format delimited fields terminated by '|' stored as textfile;
Es wird eine Tabelle users mit den vier Spalten id, office_id, fname und lname angelegt. Eine Constraint wie die angegebene Primary-Key-Constraint kann angegeben werden, wird aber nicht erzwungen. Man ist also selbst dafür verantwortlich, dass id für jede Row verschiedene Werte enthält. Es macht aber trotzdem Sinn, Constraints anzugeben, da der Big SQL-Optimierer diese Information bei der Erzeugung des Query-Plans nutzt. Die Klauseln nach der schließenden Klammer geben an, wie die Daten zu speichern sind, nämlich in einem zeilenorientierten Format, wobei das Pipe-Symbol als Terminator für die einzelnen Spalten benutzt wird. Man hätte beim ROW FORMAT alternativ auch eine SerDe-Klasse angeben können. Es werden verschiedene Dateiformate unterstützt, unter anderem auch ORC, Avro und Parquet.
Wenn man eine Tabelle mit CREATE HADOOP TABLE anlegt, dann ist diese zunächst leer. Möchte man dagegen existierende Dateien als Tabelle nutzen, kann man stattdessen die CREATE EXTERNAL HADOOP TABLE-Klausel mit sonst gleicher Syntax benutzen, um diese existierende Datei als Tabelle sichtbar zu machen. Wird auf einer external Tabelle ein DROP durchgeführt, werden die Metadaten für diese Tabelle (einschließlich der Informationen im Hive Catalog) gelöscht, die eigentliche Datei mit ihrem Inhalt existiert aber weiter.
Big SQL nutzt auch den Hive Catalog, so dass auf Big SQL-Tabellen auch mit anderen Werkzeugen wie Hive zugegriffen werden kann. Insbesondere macht sich der Anwender damit durch den Einsatz von Big SQL nicht von Big SQL abhängig, sondern kann jederzeit, wenn er die Vorteile von Big SQL – wie deutlich bessere Performance und größeren SQL-Sprachumgang – nicht länger nutzen möchte, zu Hive zurückkehren.
Eine so angelegte Tabelle, sei sie nun external oder nicht, kann nun gegebenenfalls mit weiteren so definierten Tabellen in anderen SQL-Statements wie z. B. einem SELECT benutzt werden, wie man es von relationalen Datenbanksystemen gewohnt ist.
Big SQL unterstützt auch Tabellen, deren Daten direkt auf S3 Object Storage liegen. Diese können durch die Angabe einer LOCATION-Klausel wie folgt definiert werden:
CREATE HADOOP TABLE staff ( … ) LOCATION 's3a://s3atables/staff';
Auch eine partitionierte Tabelle kann auf Object Storage angelegt werden. Damit ist es möglich, Abfragen direkt auf kostengünstigen Object Storage durchzuführen ohne die Daten vorher nach HDFS kopiert haben zu müssen. Die Performance beim Zugriff auf Tabellen, die auf Object Storage liegen, ist damit im Allgemeinen natürlich auch deutlich geringer.
Auch der Zugriff auf Daten in einem anderen Hadoop-System ist möglich, sofern dieses WebHDFS unterstützt. Auch hier gilt aber natürlich, dass der Zugriff auf Tabellen in einem anderen Hadoop-System normalerweise langsamer ist, als wenn man auf das lokale Hadoop-System zugreift. Eine Tabelle mit Zugriff über WebHDFS kann wie folgt definiert werden:
CREATE HADOOP TABLE staff ( … ) PARTITIONED BY (JOB VARCHAR(5)) LOCATION 'webhdfs://namenode.acme.com:50070/path/to/table/staff';
Big SQL führt beim Zugriff auf diese Daten dann einen REST-Aufruf durch (s. Abb.1).
Darüber hinaus bietet Big SQL auch die Möglichkeit, Daten in HBase als Tabellen zu benutzen. HBase ist eine Column-Family NoSQL-Datenbank. Die Tabellen werden dazu mit CREATE HBASE TABLE definiert:
CREATE HBASE TABLE BIGSQLLAB.REVIEWS ( REVIEWID varchar(10) primary key not null, PRODUCT varchar(30), RATING int, REVIEWERNAME varchar(30), REVIEWERLOC varchar(30), COMMENT varchar(100), TIP varchar(100) ) COLUMN MAPPING ( key mapped by (REVIEWID), summary:product mapped by (PRODUCT), summary:rating mapped by (RATING), reviewer:name mapped by (REVIEWERNAME), reviewer:location mapped by (REVIEWERLOC), details:comment mapped by (COMMENT), details:tip mapped by (TIP) );
Ein wesentlicher Punkt ist dabei das Column Mapping. Eine Column-Family-Datenbank wie HBase nutzt dabei eine Struktur, die aus einem Schlüssel und einer oder mehreren Column-Families besteht. Die Column-Families bestehen dabei wieder aus einer Menge Schlüssel/Wert-Paaren, wobei der Schlüsselteil durch Column-Family und Column-Qualifier gegeben ist. Das Column Mapping legt fest, wie die Abbildung von den HBase-Strukturen auf die relationalen Spalten erfolgen soll. Die Zeile key mapped by (REVIEW_ID) legt fest, dass der Schlüssel aus HBase auf die Spalte REVIEW_ID der relationalen Tabelle abgebildet werden soll. Die folgenden Zeilen legen fest, wie die im Beispiel vorhandenen Column-Families auf die Tabellenspalten abgebildet werden. Der Name vor dem Doppelpunkt ist die Column-Family, der nach dem Doppelpunkt ist der Column-Qualifier.
Big SQL erlaubt zugleich den föderierten Zugriff auf traditionelle relationale Datenbanksysteme wie Db2, Teradata, Oracle, Netezza, Informix, MS SQL Server. Umgekehrt ist von Db2 auch der Zugriff auf Big SQL-Tabellen möglich. Auf diese Weise können über gemeinsames SQL Zugriffe auf Daten in relationalen Datenbanksystemen und verschiedene in Hadoop gespeicherte Daten erfolgen.
Viele Verarbeitungen und Analysen im Big Data-Umfeld erfolgen durch das Spark-Framework. Es verfolgt einen funktionalen Ansatz, der dazu führt, dass Programme leicht parallelisierbar und damit leicht skalierbar sind. Spark bietet auch für verschiedenste Zwecke schon fertige Bibliotheken – unter anderem auch für Machine Learning an. Um von den Fähigkeiten von Spark im Zusammenhang mit der Analyse von Daten in einem traditionellen Data Warehouse oder in einem Hadoop-System effizient Gebrauch zu machen, ist es erforderlich, eine performante Kommunikation zwischen Spark und diesen Systemen zu haben. Dabei sind zwei Richtungen zu unterscheiden: Zum einen das Lesen von Daten durch Spark und zum anderen die Rückgabe von (großen) Ergebnissen, die mit Spark berechnet wurden, an das Data Warehouse oder an Hadoop.
Das Lesen von Daten aus Big SQL durch Spark ist ähnlich wie der Zugriff auf andere relationale Datenquellen durch Spark:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; … Dataset<Row> tableDf = sqlCtx.read() .format("jdbc") .option("driver", "com.ibm.db2.jcc.DB2Driver") .option("url","jdbc:db2://server1.foo.bar.com:32051/BIGSQL") .option("user", "aw") .option("password", "awpwd") .option("dbtable", "myschema.mytable") .load(); tableDf.createOrReplaceTempView("mytable"); Dataset<Row> queryDF = spark.sql("SELECT col2, col3 FROM mytable WHERE col1 > 100");
Interessanter ist, wie die Ergebnisse einer Spark-Berechnung in Big SQL verwendet werden können. Dies geschieht durch sogenannte polymorphe Tabellenfunktionen. Tabellenfunktionen in Big SQL können wie physikalische Tabellen in der FROM-Klausel eines SELECT-Statements benutzt werden. Zur Kommunikation mit Spark wird dabei die execspark-Tabellenfunktion benutzt. Ein Beispiel:
SELECT SUM(range.value) FROM TABLE(execspark( class => 'com.ibm.bigsql.Range', num => 100 )) AS range;
In diesem Beispiel wird eine Spalte mit Namen value zurückgeliefert, über die dann im SELECT-Statement summiert wird. Der Funktion execspark wird dazu eine Klasse (Range) und ein Parameter dafür (num) mitgegeben. Woher weiß man aber etwa, dass die Spalte value heißt? Schauen wir dazu auf die Implementierung der Klasse Range:
import com.ibm.biginsights.bigsql.spark.SparkPtf class Range extends SparkPtf { override def describe (ctx: SQLContext, args: Map[String, Object]): StructType = { // The result has a single column of type Long, named VALUE return StructType(Array(StructField("VALUE", LongType, true))) } override def execute (ctx: SQLContext, args: Map[String, Object]): DataFrame = { val val num = arguments.get("NUM").asInstanceOf[Int] // Use SQLContext.range method to generate our result return ctx.range(num) } override def destroy (ctx: SQLContext, args: Map[String, Object]):Unit={} override def cardinality (ctx: SQLContext, args: Map[String, Object]): Long = { val val num = arguments.get("NUM").asInstanceOf[Int] return num } }
Die Methode describe wird vom Query-Compiler benutzt, um eine Beschreibung der zurückgegebenen Spalten zu erhalten. Hier sieht man auch im Beispiel, dass nur eine Spalte zurückgegeben wird, diese VALUE heißt und vom Type Long ist. Die Methode execute wird dann zur Laufzeit aufgerufen, um die eigentliche Spark-Berechnung durchzuführen. Diese Methode kann auch benutzt werden, um Datenquellen in Big SQL einzubinden, die zwar Spark unterstützt (DataSources API), Big SQL aber nicht direkt, wie man im folgenden Beispiel sieht:
SELECT result.col1, …, result.coln FROM TABLE(execspark( class => 'com.ibm.bigsql.DataSource', format => ‘…’ option1 => value1, option2 => value2, option3 => value3, … load => <table or query> )) AS result
Hier sieht man, wie konkret der Zugriff auf eine MySQL-Tabelle erfolgen kann, die dann in Big SQL gefiltert wird:
SELECT people.firstname, people.lastname, people.phone FROM TABLE(execspark( class => 'com.ibm.bigsql.DataSource', format => ‘jdbc’, url => ‘jdbc:mysql://localhost/mydb’, driver => ‘com.mysql.jdbc.Driver’, user => ‘aw’, password => ‘awpwd’ load => ‘default.people’ )) AS people WHERE people.age > 55
Auf diese Art kann Spark genutzt werden,
- um Dateien (Parquet, JSON, CSV) zu lesen,
- um den Zugriff auf NoSQL-Quellen (z. B. Cassandra) zu realisieren, die Big SQL nicht direkt unterstützt,
- um Transformationen und Aggregationen durchzuführen, die nur schwierig in SQL zu erledigen sind,
- um Spark-Bibliotheken wie ML zu nutzen oder
- um Spark als Cache für Ergebnisse zu verwenden.
Dies ist möglich, da eine langlaufende Spark-Anwendung, die von Big SQL gesteuert wird, benutzt wird. Dabei sind die jeweiligen Spark Executors mit den Big SQL-Workers auf dem gleichen Knoten zusammengespannt, um den Kommunikationsaufwand zu minimieren. Das folgende Bild verdeutlicht dies:
Der Datenaustausch kann damit innerhalb der Knotengrenzen und deshalb sehr performant erfolgen.
Eine derartige Architektur mit Datenbank-Worker, lokaler Kommunikation und zugehörigem Spark Executor wird aber auch in einer rein relationalen Datenbank – etwa bei Db2 Warehouse – für analytische Workloads genutzt, um effizient Spark-Programme einsetzen zu können.
Zusammenfassend kann man sagen, dass durch Technologien wie sie Big SQL bietet und wie sie hier vorgestellt wurden, eine einfache und performante Integration von relationaler und Hadoop-Welt möglich ist und dass neue Verarbeitungs- und Analysemethoden wie sie etwa Spark bietet, in beiden Welten genutzt werden können.