Apache Kafka – eine Schlüsselplattform für hochskalierbare Systeme

Geht es darum, Daten von A nach B zu bringen, hört man oft als erstes "Kafka". Und in der Tat wird Kafka immer mehr zu einem Standard für verschiedene Einsatzzwecke. Angefangen als Messaging-System macht es nun auch anderen datenlastigen Frameworks wie Apache Flume Konkurrenz, aber auch Streaming-Technologien.
Grund genug für jeden Entwickler und Architekten, sich mit den Grundlagen von Apache Kafka auseinanderzusetzen. Der große Zuspruch, den Apache Kafka sowohl in der Open Source-Gemeinde als auch bei Unternehmen erfährt, liegt sicher in verschiedenen Punkten begründet. Zum einen an der Kombination aus Vielseitigkeit und Einfachheit und zum anderen an der Möglichkeit der horizontalen Skalierung, die den Einsatz in modernen Clustersystemen möglich macht. Zunächst sollen hier einige Grundlagen und Herausforderungen verteilter Systeme dargestellt werden, um danach die besondere Art zu zeigen, wie Apache Kafka damit umgeht.
Warum benötigt man ein Messaging-System wie Kafka?
Nehmen wir an, wir wollen Nachrichten von System A an System B senden. Würden wir A und B direkt miteinander verbinden, z. B. indem der Nachrichtenempfänger eine REST-Schnittstelle bietet, die der Sender aufruft, werden wir in einem Produktivsystem schnell ernstzunehmende Herausforderungen bekommen:
Herausforderung 1: System B ist nicht erreichbar, z. B. aufgrund eines Wartungsfensters oder wegen Netzwerkproblemen. System A hat aber nicht die Kapazität, Nachrichten so lange zu puffern, bis B wieder verfügbar ist.
Herausforderung 2: System A könnte System B mit unzähligen Nachrichten überhäufen und System B damit in die Knie zwingen (DoS).
Herausforderung 3: System B erhält eine Nachricht von System A, beginnt mit der Verarbeitung und stürzt plötzlich ab. System A hingegen sieht die versendete Nachricht als verarbeitet an, obwohl dies nicht der Fall ist.
Um diese Probleme zu lösen setzt man Messaging-Systeme ein, die zwischen Sender und Empfänger platziert werden. In klassischen Messaging-Systemen wird dabei differenziert, ob der Nachrichtenkanal als Queue oder als Topic implementiert wird. Queues kann man sich wie eine Warteschlange in manchen Geschäften vorstellen: Alle Personen warten in einer Schlange vor mehreren Kassen. Sobald eine Kasse dazu bereit ist, wird der nächste Kunde aus der Schlange aufgerufen (und damit aus der Queue entfernt). Durch dieses System ist sichergestellt, dass jeder Kunde nur einmal zur Kasse gebeten wird.
Im Kontext verteilter Systemen wird der Sender einer Nachricht üblicherweise als Producer bezeichnet und der Empfänger als Consumer. Folgende Abbildung verdeutlicht, dass jede der drei gesendeten Nachrichten insgesamt nur einmal gelesen wird.
Das Pendant dazu ist das Topic, das nach dem Publish-Subscribe-Mechanismus funktioniert. Das entsprechende Bild wäre dazu z. B. ein analoger Fernsehsender: Jeder der an den Inhalten interessiert ist, kann zuschalten und die Inhalte konsumieren. Jede Nachricht geht also zu allen Consumern.
Kafka im Architektur-Überblick
Apache Kafka, ursprünglich von LinkedIn entwickelt, wurde 2011 zum Apache Incubator und wird seit 2012 von der Apache Software Foundation entwickelt und gepflegt. Benannt wurde das Framework nach dem Autor Franz Kafka. Jay Kreps, der Erfinder von Apache Kafka, schätzt die Werke von Kafka sehr und entschied sich deshalb für dessen Namen [1].
Was zunächst für eine neue Messaging-Technologie verwundert: Kafka weist keinerlei JMS-Konformität auf und ist vom Feature-Reichtum auch erst hinter gängigen JMS-Systemen anzusiedeln. Dafür stellt es in Sachen Datendurchsatz und Vielseitigkeit vergleichbare Systeme in den Schatten. Die Basis dafür sind seine Skalierbarkeit sowie seine Replikationsfähigkeit und damit Ausfallsicherheit. Diese Skalierbarkeit ist in Kafka so tief verankert, dass ein nicht-repliziertes Kafka-System aus technischer Sicht lediglich ein Kafka-System mit dem Replikationsfaktor "1" ist. Das ist auch der Grund, warum Kafka speziell durch den Big Data-Kontext bekannt wurde und Teil vieler gängigen Hadoop-Distributionen wurde, zum Beispiel der von Hortonworks und der von Cloudera [2].
Kafka: Technische Grundlagen
Kafka enthält aktuell vier Kern-APIs:
- Producer API: Erlaubt das Schreiben von Nachrichten
- Consumer API: Erlaubt das Lesen von Nachrichten
- Streams API: Erlaubt das Analysieren und Transformieren von Nachrichten
- Connect API: Erlaubt die Synchronisation zweier Datensysteme, z. B. einer relationalen Datenbank und Hadoops HDFS
Ein Topic in Kafka bezeichnet die logische Instanz, zu der Nachrichten gesendet werden können und von welcher gelesen werden kann. Nachrichten werden als Records bezeichnet. Widmen wir uns aber zunächst den Topics: Sie sind der eigentliche Kernbestandteil von Kafka. Haben wir zum Beispiel Sensordaten von Maschinen, die wir von A nach B bringen wollen, könnten wir ein Topic sensor_data erzeugen. Unser Topic besteht dabei aus mehreren Partitionen, die konkrete Anzahl ist konfigurierbar. Eine Erläuterung über den Sinn von Partitionen erfolgt später im Text. Im einfachen Fall haben wir einen Kafka Broker auf einem Server mit drei Partitionen.
Der Producer, hier die Maschinen, entscheiden welche Datensätze in welche Partition geschrieben werden. In einigen Anwendungsfällen kann das relevant sein, oft jedoch genügt es, die Partitionen mittels Round-Robin-Verfahren einfach gleichmäßig auszulasten.
Innerhalb der Partition ist eine Nachricht als Offset gekennzeichnet. Dies ist eine aufsteigende Zahl, die jede Nachricht innerhalb einer Partition eindeutig identifiziert. In Abb.6 wurden im Topic insgesamt 20 Nachrichten gespeichert. Die nächsten Nachrichten werden nun entweder mit dem Offset "7" in Partition 1 oder 3 gespeichert oder als "6" in Partition 2.
Kafka speichert die eingehenden Nachrichten übrigens immer auf der Festplatte, Kafka ist also kein "In-Memory"-System, aber dennoch hoch performant. Da die Festplattenauslastung steigt, je länger das System läuft, müssen Nachrichten von Zeit zu Zeit auch wieder gelöscht werden. Um den Speicherbedarf kontrollierbar zu halten, bietet Kafka drei Cleanup Policies:
- Retention-Time: Die ältesten Daten werden frühestens nach einer frei definierbaren Zeit gelöscht.
- Retention-Size: Die ältesten Daten werden frühestens gelöscht, sobald der Speicherbedarf der Nachrichten eine definierte Größe erreicht hat.
- Log-Compaction: Es werden nur Nachrichten gelöscht, deren Schlüssel mehrfach vorkommt und niemals die neueste Nachricht mit dem Schlüssel.
Standardmäßig wird die Retention-Time verwendet und Nachrichten werden nach sieben Tagen gelöscht.
Zusammenspiel der Komponenten in Kafka: Producer, Topic und Consumer
Um Nachrichten aus dem Topic zu lesen wird die Consumer API verwendet. Ein wichtiges Faktum ist hier, dass die Nachrichten nicht per Push-Verfahren an die Consumer geliefert werden, sondern dass die Consumer die Nachrichten aktiv beim Topic abholen. Dies ist auch bei anderen Messaging-Systemen gängige Praxis, da der Consumer auf diese Weise die Nachrichten in der Geschwindigkeit lesen kann, in der er diese auch verarbeiten kann. Consumer sind außerdem in der Lage, nicht nur die neueste, nicht gelesene Nachricht abzuholen, sondern jede Nachricht die sich noch in der Partition und damit auf der Festplatte befindet, und die noch nicht durch die Cleanup Policies entfernt wurde.
Das wichtigste Thema dabei ist die Entscheidung, wie die Nachrichten analog der anfangs definierten Differenzierung zu behandeln sind: Sollen die Nachrichten eher queue- oder eher topic-artig werden? Kafka bietet hier mit dem Konzept der Consumer Groups beide Möglichkeiten.
Jeder Consumer kann zu einer Consumer Group gehören. Dabei wird jede Nachricht im Kafka Topic von einer Consumer Group nur insgesamt einmal gelesen. Sprich: Haben zwei Consumer die gleiche Consumer Group und lesen beide aus dem gleichem Topic, lesen beide niemals die gleiche Nachricht, sondern immer verschiedene Nachrichten. In der Praxis sind diese beiden Consumer keine unterschiedlichen Programme, sondern z. B. zwei Threads eines Programmes oder ein auf einem Hadoop-Cluster deployter Job. Ein typischer Anwendungsfall wäre hier die Lambda-Architektur [3], bei der alle Daten sowohl vom Speed-Layer als auch vom Batch-Layer für die In-Memory-Verarbeitung gelesen werden.
Abb.7 verdeutlicht die Verteilung der Nachrichten (symbolisiert durch die drei Briefumschläge) an zwei Consumer-Applikationen die in jeweils zwei Threads laufen. Dabei hat jede Applikation eine eigene Consumer Group.
Damit kommen wir nun auch zurück zu den Partitionen. Wir erinnern uns: Ein Topic besteht aus mehreren Partitionen und Producer bestimmen die Verteilung der Daten zwischen den Partitionen. Da wir nun wissen, dass es mehrere Consumer innerhalb einer Consumer Group gibt (es sei denn, man implementiert eine nicht geclusterte Single-Thread-Applikation), stellt sich die Frage, aus welchen Partitionen die Consumer lesen. Ein Consumer kann aus mehreren Partitionen lesen, eine Partition hingegen kann nur von exakt einem Consumer einer Consumer Group gelesen werden.
Betrachten wir das Beispiel in Abb.8. Wir haben ein Topic mit drei Partitionen und eine Applikation, die in vier Threads läuft, welche sinnvoller Weise alle die gleiche Consumer Group haben (da die Applikation andernfalls Nachrichten mehrfach aus dem Topic abholen würde). Da wir nun aber nur drei Partitionen haben, wird Thread 4 nicht mehr bedient und erhält somit keine Daten. Analog verhält sich der umgekehrte Fall: Hätten wir hier nur zwei Threads, würde ein Thread die Daten aus Partition 1 bekommen und der andere die Daten aus Partition 2 und 3. Letzterer würde also – vorausgesetzt der Producer hat die Daten gleichmäßig über die Partitionen verteilt – doppelt so viele Daten bekommen wie der andere. Kafka hat hierfür eine Lösung parat: Es ist möglich, zur Laufzeit die Anzahl der Partitionen eines Topics zu erhöhen.
Replikation und Skalierung mit Kafka
Eingangs wurde erwähnt, dass Replikation und Skalierung die Kernkonzepte von Kafka sind – und doch wurden beide bis jetzt noch nicht erwähnt. Der Grund: Um diese zu verstehen ist es wichtig, erst die bisher genannten Grundkonzepte verinnerlicht zu haben. Die Konstellation der letzten Abbildung mit den drei Partitionen und vier Threads soll nun skaliert werden. Wir wollen dafür nicht mehr nur einen Kafka Broker verwenden, sondern zwei. Wir setzen darüber hinaus den Replication Factor auf "2", was dafür sorgt, dass jede vorhandene Nachricht über exakt zwei Broker verteilt ist. Das Ergebnis dieser Skalierung Abb.9.
Durch den Replication Factor wird dafür gesorgt, dass die Partitionen redundant, also in diesem Fall jeweils auf beiden Brokern gespeichert werden. In den vorherigen Grafiken war außerdem jede Partition immer automatisch der "Leader". Durch die Replikation gibt es jetzt auch "Follower". Für Partition 1 ist das Broker 1 und für Partition 2 und 3 Broker 2. Sämtliche Lese- und Schreibvorgänge werden ausschließlich mit dem Leader gemacht. Will also der Producer Daten in Partition 2 schreiben, schreibt er das in die Partition 2 im Broker 2 und in keinem Fall in Broker 1. Die einzige Verantwortlichkeit der Follower liegt in der Replikation der Daten vom jeweiligen Leader. Fällt Broker 1 aus, würde Kafka feststellen, dass es keinen Leader mehr für Partition 1 gibt und würde daraufhin einen neuen Leader bestimmen – in diesem Fall bleibt nur die Möglichkeit, Broker 2 für Partition 1 als Leader zu bestimmen. Jetzt gibt es verschiedene Ausfallszenarien und Synchronisationsprobleme, die potenziell eintreten, in diesem Artikel allerdings nicht weiter ausgeführt werden können. Zum Beispiel könnte es passieren, dass der Producer Daten in Partition 1 schreibt und – bevor die Replikation durchgeführt wird – abstürzt und nicht mehr erreichbar ist. Dann wäre die gesendete Nachricht verloren. Für dieses Risiko ist der Producer verantwortlich. Per Konfiguration kann diesem mitgeteilt werden, ab wann eine Nachricht als erfolgreich gesendet angesehen werden soll. Für diesen Fall greifen drei Optionen:
- Ab sofort, unabhängig ob der Kafka Broker überhaupt erreicht wurde.
- Sobald die Nachricht beim ersten Broker angekommen ist.
- Sobald die Nachricht entsprechend dem eingestelltem Replication Factor repliziert wurde.
Durch die letzte Einstellung würde der Producer die Nachricht so lange als "nicht erfolgreich gesendet" ansehen, bis die gewünschte Redundanz im Kafka Cluster vorhanden ist.
War es das schon?
Nein! Auch wenn Kafka oft als ein "einfaches" System angesehen wird, gibt es noch viele weitere Herausforderungen, denen es begegnen muss. Betrachtet man die Dokumentation von Kafka [4], fallen schnell weit über 100 Properties auf, die man konfigurieren kann, um verschiedene Themen wie Security und Skalierung feingranularer zu steuern. Der Fokus dieses Beitrags liegt auf den Grundkonzepten von Kafka sowie auf Producer und Consumer. Die beiden anderen APIs, die Streams API und die Connector API, basieren auf dem gleichen Grundkonzept: dem Topic. Nur werden dort keine Producer und Consumer mehr verwendet.
Kafka hat seit seinem Open Source-Start im Jahr 2011 eine enorme Verbreitung in modernen, datenlastigen IT-Systemen erreicht und wird diese aller Wahrscheinlichkeit nach weiter ausbauen. Die Kafka Connector API wird bisherigen Standards wie Apache Flume weiterhin ernstzunehmende Konkurrenz machen und diese möglicherweise vom Markt verdrängen. Mit der Streams API drängt Kafka nun auch immer stärker in das Thema Stream Processing und macht damit Technologien wie Storm und Spark Streaming ganz erhebliche Konkurrenz.
Ein Team um Jay Kreps hat das Unternehmen Confluent [5] gegründet, das eine ganze Streaming-Plattform um Kafka entwickelt hat, bei der mit Hilfe der SQL-Engine KSQL Streaming-Daten in SQL-Syntax analysiert werden, ohne eigenen Programmcode in Java oder Python schreiben zu müssen. Betrachtet man außerdem die hochaktive Open Source-Community von Kafka auf Github [6], kann die Wahrscheinlichkeit als hoch angesehen werden, dass Kafka in einigen Jahren zu einem De-facto-Standard für datengetriebene Systeme wird.