Samstag, 25. Mai 2024

Cloud Serie: Streaming mit Apache Beam und GCP Dataflow - Transaktions-Monitoring mit ESC als Stream

In der heutigen datengetriebenen Welt sind Streaming-Anwendungen unverzichtbar geworden, um kontinuierliche Datenströme in Echtzeit zu verarbeiten. Die Welt des Streaming kann jedoch komplex und herausfordernd sein, insbesondere wenn es darum geht, robuste und skalierbare Lösungen zu entwickeln. Apache Beam ist ein leistungsfähiges Framework, das die Entwicklung von Batch- und Streaming-Datenpipelines erleichtert. Apache Beam abstrahiert die zugrundeliegende Verarbeitungslogik und ermöglicht es Entwicklern, sich auf die Geschäftslogik zu konzentrieren, anstatt sich mit den Details der Implementierung auseinanderzusetzen. Trotz der Abstraktion der Implementierung von der eigentlichen Ausführungslogik ist ein gewisses Verständnis der theoretischen Grundlagen von Streaming-Anwendungen notwendig, um Apache Beam besser zu verstehen und damit bessere Anwendungen zu schreiben. Aus diesem Grund werde ich im Folgenden zunächst einige theoretische Aspekte und Anwendungsbeispiele von Streaming-Anwendungen vorstellen. Danach gehe ich näher darauf ein, was Apache Beam genau ist und wie man damit Streaming- und/oder Batch-Anwendungen programmieren kann. Abschliessend zeige ich, wie man eine Apache Beam Anwendung in der Google Cloud Platform (GCP) mittels Dataflow hochverfügbar und skalierbar ausführen kann und welches mögliche Anwendungsfälle sein könnten.

Ein bisschen Streaming-Theorie

Streaming wird in der Softwareentwicklung häufig in verschiedenen Kontexten verwendet. Viele kennen das Lesen von Dateien als Stream, das Lesen einer Zeichenkette als Stream oder die Verarbeitung einer bekannten HTTP-Anfrage als Stream. In all diesen Fällen sind die "Ströme" jedoch endlich und ähneln eher einer Batch-Verarbeitung, bei der eine vorgegebene oder gut bekannte Menge von Daten verarbeitet wird. Wenn wir jedoch über Streaming im Zusammenhang mit echten Streaming-Anwendungen und Frameworks wie Apache Beam sprechen, ist das etwas anders. Hier bedeutet Streaming einen theoretisch unendlichen Datenstrom, von dem man nicht weiss, ob und wann er aufhört. Diese Datenströme sind kontinuierlich und dynamisch, was bedeutet, dass Daten jederzeit in unvorhersehbarer Reihenfolge eintreffen können. Im Gegensatz zu einem Stream aus einer Datei, der geordnet und abgeschlossen ist, sind solche kontinuierlichen Streams oft ungeordnet, d.h. ältere Daten können später im Stream auftauchen. Dies stellt erhöhte Anforderungen an ein Verarbeitungssystem. Dies insbesondere dann, wenn aus einem solchen Datenstrom Echtzeitanalysen durchgeführt werden sollen. Die Anforderungen steigen weiter, wenn die Abarbeitung bestimmter Verarbeitungsschritte zusätzlich parallelisiert werden soll. Diese Komplexität muss man sich vor Augen halten, wenn man sich fragt, warum man für eine Streaming-Anwendung überhaupt ein spezielles Framework wie Apache Beam verwenden soll.

Schauen wir uns diese Herausforderungen anhand eines konkreten Beispiels genauer an. Wir wollen auf unserer Website in Echtzeit die 10 Begriffe anzeigen, die in den Artikeln der klassischen Medien und der sozialen Medien am häufigsten verwendet werden. Was auf den ersten Blick banal erscheint, erfordert bei näherer Betrachtung mehrere Komponenten und mehrere Entscheidungen.

Wir brauchen verschiedene Gatherer, um die verschiedenen Datenquellen zu erschliessen. Solche, die neue Artikel von den gewünschten Medienportalen beziehen und solche, die Posts von den gewünschten sozialen Medien wie X (früher Twitter), Facebook, Instagram etc. beziehen. Diese Gatherer stellen die Informationen asynchron über Pub/Sub zur Verfügung. Hier stellt sich bereits die erste Frage: Liefert der Gatherer bereits eine extrahierte Wortliste oder den ganzen Artikel? Unsere Streaming-Anwendung abonniert dann das entsprechende Pub/Sub-Topic, um die Wortlisten oder Artikel zu erhalten. Die nächste Frage ist: Welcher Zeitraum soll abgedeckt werden? Die 10 am häufigsten verwendeten Begriffe in welchem Zeitraum? Ist die Hitliste erstellt, muss sie irgendwie gespeichert werden. Das führt zur nächsten Frage: Was macht die Streaming-Anwendung mit dem Ergebnis? Speichert sie es irgendwo klassisch in der Datenbank oder stellt sie das Ergebnis wieder in einem Pub/Sub-Topic zur Verfügung? Schliesslich muss die aktuelle Trefferliste auf die Website gelangen. Hier stellt sich die Frage, wie die Aktualisierung erfolgen soll. Gibt es einen Webhock und die Hitliste wird per Push an die Website gesendet oder sendet die Website regelmässige Pull-Requests an ein Backend, um die aktuelle Hitliste abzufragen? Fragen über Fragen und genug Architektur-Stoff, um weitere Artikel darüber zu schreiben. An dieser Stelle konzentrieren wir uns auf die eigentliche Streaming-Anwendung und ihre Herausforderungen. Wir gehen davon aus, dass die Gatherer ganze Artikel in Pub/Sub einstellen und die Hitliste jeweils die letzten 30 Sekunden darstellen soll. Unser grober Ablauf könnte dann wie folgt aussehen:

  1. Pub/Sub-Message trifft ein
  2. Artikel in die einzelnen Worte zerlegen
  3. Worthäufigkeiten des Artikels/Posts berechnen
  4. Aktuelle Worhäufigkeiten über alle Artikel/Posts aktualisieren
  5. Nach dem Zeitfenster von 30 Sekunden die aktuelle Worthäufigkeit der Top 10 in Pub/Sub einstellen (wir nehmen diese Lösung)
Wir halten es hier einfach. Der zweite Punkt könnte noch vertieft werden mit der Frage, ob die Wörter 1:1 übernommen werden sollen oder ob es nicht sinnvoller ist, die Wörter zu normieren. Also bspw. einen Stemmer zu verwenden und zusätzlich alle Buchstaben klein zu schreiben. Diese Normierung könnte in weitere Einzelschritte unterteilt werden. Das Thema Sprache der Artikel/Postings haben wir bewusst nicht angesprochen, der Umgang damit wäre sicher auch noch zu klären. Wir haben nun ein ungefähres gemeinsames Bild vom Ablauf unserer Streaming-Anwendung, so dass wir uns nun den konkreten Herausforderungen widmen können.

Mit der allgemeinen Worthäufigkeit und dem Zeitfenster (30 Sekunden) haben wir unbewusst bereits eine wichtige Entscheidung für die Streaming-Anwendung getroffen. Nämlich die Frage, wie wir das Fenster definieren. Sehen wir uns dazu das folgende Beispiel an:

Abb. 1: Verschiedene Typen von Informationen auf der Zeitachse

Wir haben einen Stream mit drei Arten von Informationen: A, B und C. Für die Verarbeitung haben wir zwei grundlegende Optionen: 1) Jedes Element einzeln. 2) Elemente in Gruppen. Für unseren Anwendungsfall mit Worthäufigkeiten kommt die Option 1 nicht in Frage, das wäre sinnlos, da wir dann nicht die Häufigkeiten aller Artikel hätten, sondern nur die Häufigkeiten von jeweils einem Artikel. In der Grundoption 2 gibt es jedoch mehrere Möglichkeiten. Wir können das Fenster nach einer bestimmten Zeit fixieren. Zum Beispiel unsere 30 Sekunden. Wir können das Fenster aber auch flexibel nach Typ definieren. Ein Fenster wäre immer so lange, wie der gleiche Typ kommt. Also ein Fenster für A, B und C. Die folgende Abbildung zeigt dies:

Abb. 2: Fenster nach Zeit (oben) und nach Typ (unten) - selbstverständlich gibt es noch weitere Trennmöglichkeiten, bspw. nach Anzahl, auf diese gehen wir hier nicht weiter ein

Im ersten Fall haben wir ein fixes Fenster nach Zeit. Das heisst im ersten Fenster haben [A, A] im zweiten [B], im dritten [C, A] im vierten [B], im fünften [B] und im sechsten [C]. Im zweiten Fall haben wir dynamische Fenster nach Typ. Das heisst im ersten Fenster [A, A], im zweiten [B], im dritten [C], im vierten [A], im fünften [B, B] und im sechsten [C].

Wenn wir in einer Streaming-Anwendung Informationen in Informationskörben verarbeiten, müssen wir entsprechend definieren, wann ein solcher Korb endet. Wie wir wissen, ist der Stream (theoretisch) unendlich. Wir brauchen also Kriterien, wann wir einen solchen Korb zusammen haben, um ihn als Ganzes verarbeiten zu können. Welche Art von "Trennung" die beste ist, hängt vom konkreten Anwendungsfall ab. In unserem Fall scheint ein festes Zeitfenster am besten zu sein. Wir wollen ja alle Artikel/Posts gleich behandeln und nur eine Aussage über die aktuellen Top 10 Wörter machen. Damit kommen wir zu einem weiteren wichtigen Unterscheidungskriterium bei Streaming-Anwendungen und der Herausforderung, wie wir damit umgehen: Welche Zeit nehmen wir? Sie erinnern sich: Wir haben verschiedene Gatherer, die Artikel und Posts aus verschiedenen Quellen sammeln. Die obige Zeitleiste zeigt, wann diese Informationen in unser Streaming-System einfliessen. Wir müssen uns darüber im Klaren sein, dass nicht alle Artikel/Posts, die in einer bestimmten Reihenfolge in unser System gelangen, auch in dieser Reihenfolge veröffentlicht werden. Das kann von verschiedenen Faktoren abhängen: Die Sammler arbeiten unterschiedlich schnell. Die Medien veröffentlichen die Artikel nicht alle gleich schnell, oder es dauert unterschiedlich lange, die Artikel zu erhalten. Schliesslich speisen die Gatherer diese Informationen in Pub/Sub ein. Pub/Sub garantiert keine Reihenfolge. Es ist also nicht garantiert, dass ein Artikel/Post, der zuerst an Pub/Sub geliefert wurde, auch tatsächlich vor einem später an Pub/Sub gelieferten Artikel/Post an den Subscriber geliefert wird. Die folgende Abbildung zeigt dies schematisch:

Abb. 3: Eintreffen vs. Publikationszeit

Wenn wir in "Streams" denken, müssen wir unterscheiden zwischen dem Zeitpunkt, zu dem ein Ereignis stattgefunden hat, in unserem Fall der Zeitpunkt der Veröffentlichung, und dem Zeitpunkt der Verarbeitung. Wenn das Ereignis im Streaming-System zur Verarbeitung eintrifft. In der obigen Abbildung wurden zur besseren Lesbarkeit die Sekunden weggelassen und die Unterschiede in Minuten dargestellt. Wenn wir nun ein Zeitfenster von 2 Minuten definieren, sehen die Körbe für die Worthäufigkeiten wie folgt aus

Abb. 4: 3 Körbe nach Publikationszeit

Die grosse Frage ist nun: Woher weiss das System, dass nach dem Ende des ersten Korbes, also wenn "B" mit Publikationszeit 10.32 Uhr eingetroffen ist, noch weitere Artikel/Posts mit der Publikationszeit zwischen 10.30 Uhr und 10.31 Uhr eintreffen werden, der Korb also noch nicht abgeschlossen werden kann? Merken Sie sich an dieser Stelle bitte den Begriff "Watermark". Wir kommen später darauf zurück.

Eine weitere Herausforderung, die wir erkennen können, ist die notwendige Parallelität. Wir gehen hier einfach davon aus, dass wir wissen, dass der erste Korb nach dem Eintreffen von "B" um 10.32 Uhr noch nicht abgeschlossen ist. Wir halten ihn für das folgende "C" um 10.30 Uhr offen. Wir müssen jedoch den um 10.32 Uhr eingetroffenen "B" bearbeiten. Das bedeutet, dass wir zu diesem Zeitpunkt zwei Körbe gleichzeitig bearbeiten müssen.

Bisher haben wir uns bei der Frage nach dem Wie implizit auf die Worthäufigkeit beschränkt (vereinfacht als ein Schritt). Mit den Elementen eines definierten Korbes können wir natürlich ganz unterschiedliche Bearbeitungen vornehmen und diese auch kombinieren. Wir können sie z.B. filtern, paarweise zusammenfassen oder gruppieren. Die folgende Abbildung zeigt diese Möglichkeiten:
Abb. 5: Verschiedene Möglichkeiten der Verarbeitung

Wir definieren also das Fenster, das den Inhalt des Warenkorbs festlegt, das Was, und wie wir einen solchen Korb bearbeiten, das Wie. Das ist wichtig, weil je nach Algorithmus eine Parallelisierung der Verarbeitung in diesem Schritt möglich oder besser möglich ist oder nicht.

In diesem Artikel beschränken wir uns auf diese Fragen und Herausforderungen von Streaming-Systemen (man könnte Bücher damit füllen) und fassen unsere bisherigen Erkenntnisse zusammen: In einem endlosen Strom von Ereignissen müssen wir definieren, wann wir einen "Schnitt" machen können, um einen Teil des Stroms als eine Einheit verarbeiten zu können. Diesen "Schnitt" können wir auf verschiedene Weise definieren. Eine zeitliche Definition ist insbesondere dann schwierig, wenn nicht der Zeitpunkt des Eintritts in das Streaming-System relevant ist, sondern der ursprüngliche Zeitpunkt des Ereignisses ausserhalb des Streaming-Systems. Dies kann zu Situationen führen, in denen wir mehrere "Schnitte" gleichzeitig offen halten und die eintreffenden Ereignisse dem entsprechenden Korb zuordnen müssen. Für die Verarbeitung eines Korbes gibt es unterschiedliche Möglichkeiten. Abhängig von den Anforderungen können wir Elemente u.a. filtern (wenn wir nur bestimmte Elemente brauchen), gruppieren (wenn wir gruppenspezifische Verarbeitungen brauchen) oder Paare bilden (wenn wir bspw. Schlüssel- und Wertpaare bilden können).

Etwas Geschichte

Bevor wir uns anschauen, wie die eine oder andere Frage und Herausforderung in Apache Beam beantwortet bzw. gelöst wurde, ist es an der Zeit für ein wenig Geschichte. Warum das? Aus zwei Gründen: 1) Es hilft zu verstehen, wo Apache Beam herkommt und was es mit Apache Spark, Apache Flink und Googles Dataflow zu tun hat. 2) Wir stehen alle auf den Schultern von Giganten. Dass wir heute relativ einfach und abstrakt robuste Streaming-Systeme implementieren können, verdanken wir vielen Leuten, die sich jahrelang mit diesem und verwandten Themen beschäftigt haben. Das sind Leute im Hintergrund, die kaum jemand kennt. Zu schreiben, wie einfach es ist, grossartige Streaming-Anwendungen zu schreiben, ohne sie zumindest als Kohorte zu würdigen, fände ich nicht angemessen.

MapReduce (2003): Google veröffentlichte das MapReduce-Framework, das die Datenverarbeitung in grossem Massstab revolutionierte. Es bot ein einfaches Programmiermodell, das auf zwei Hauptfunktionen, Map und Reduce, basierte und die Verteilung von Aufgaben auf viele Knoten in einem Cluster ermöglichte. Hadoop (2004): Apache Hadoop wurde von Doug Cutting (welcher nebenbei bemerkt auch Apache Lucene geschrieben hat) und Mike Cafarella entwickelt und basiert auf den Prinzipien von MapReduce. Hadoop hat die Idee von verteilten Speichersystemen und einer verteilten Datenverarbeitungsplattform in die Open-Source-Welt gebracht, die es vielen Unternehmen ermöglicht, grosse Datenmengen effizient zu verarbeiten. Flume (2007): Apache Flume wurde entwickelt, um grosse Mengen von Streaming-Daten in Echtzeit zu sammeln, zu aggregieren und zu verschieben. Es wurde vor allem für das Sammeln von Logdaten in verteilten Systemen entwickelt. MillWheel (2007): Google MillWheel ist ein Framework, welches eine hoch skalierbare und sichere Prozessierung von Streaming-Prozessen ermöglicht. MillWheel führte Konzepte wie strikte Konsistenz, geringe Latenz und exakte einmalige Verarbeitung ein. Diese Konzepte hatten grossen Einfluss auf die spätere Entwicklung von Google Dataflow.

Exkurs: Bisher haben wir festgestellt, dass Ereignisse manchmal zu spät eintreffen können. Wir hatten uns noch nicht damit beschäftigt, was passiert, wenn Ereignisse zu spät eintreffen und der entsprechende Korb bereits bearbeitet wurde. Ausserdem sind wir bei all den Definitionen und Parallelverarbeitungen bisher intuitiv davon ausgegangen, dass jedes Ereignis logischerweise nur einmal verarbeitet wird. "C" um 10.30 Uhr landet genau in einem Korb und wird genau einmal verarbeitet. Was uns normal erscheint, war nicht immer so und ist im Detail nicht immer trivial. In MillWheel hat das Konzept "Exactly one matters" einen gebührenden Platz gefunden und es wurden wichtige theoretische Grundlagen geschaffen, die auch heute noch gelten und angewendet werden.

Apache Spark (2009): Apache Spark wurde entwickelt, um den Beschränkungen von MapReduce zu begegnen. Es bietet eine schnellere In-Memory-Verarbeitung und unterstützt sowohl Batch- als auch Streaming-Datenverarbeitung. Die Erweiterbarkeit von Spark und seine Fähigkeit, komplexe Datenverarbeitungs-Workflows zu erstellen, machten es schnell populär. Apache Flink (2010): Flink wurde entwickelt, um Datenströme in Echtzeit zu verarbeiten. Es bietet ein leistungsfähiges Framework für die Verarbeitung von Streaming- und Batch-Daten mit geringer Latenz und hohem Durchsatz.

Apache Storm (2011): Storm ermöglichte Echtzeitdatenverarbeitung mit geringer Latenz und hoher Fehlertoleranz. Es wurde von Twitter entwickelt und fand weite Verbreitung in Szenarien, die eine schnelle Datenverarbeitung erforderten. Google Dataflow (2014): Dataflow war ein weiterer Schritt in der Evolution der Datenverarbeitung bei Google, basierend auf den Erfahrungen mit MapReduce und MillWheel. Dataflow bietet ein flexibles und skalierbares Modell für Batch- und Streaming-Verarbeitung und wurde als vollständig verwalteter Service auf der Google Cloud Platform eingeführt. Wir kommen noch detaillierter auf Dataflow zurück. Apache Beam (2016): Apache Beam entstand aus Google Dataflow und brachte das Konzept einer einheitlichen Programmierumgebung für Batch- und Streaming-Datenverarbeitung in die Open-Source-Welt. Apache Beam bietet ein SDK, das von verschiedenen Laufzeitumgebungen unterstützt wird, darunter Google Dataflow, Apache Spark, Apache Flink und andere. Es abstrahiert die zugrundeliegende Verarbeitungslogik und ermöglicht Entwicklern, ihre Pipelines einmal zu schreiben und überall auszuführen.

Darüber hinaus gibt es weitere wichtige Entwicklungen im Ökosystem der Streaming-Anwendungen. Hier sind sicherlich Apache Kafka und Google Pub/Sub zu nennen. Beide ermöglichen es, Streaming-Anwendungen asynchron aus verschiedenen Quellen mit Daten zu versorgen.

Wie man sehen kann, war es für die generell schnelle technologische Entwicklung in der IT ein eher langer Weg von ersten Konzepten wie MapReduce bis hin zu robusten und relativ einfach zu implementierenden Streaming-Anwendungen.

Abstraktion mit Apache Beam

Dataflow von Google hatte zwei starke Komponenten. Erstens die Laufzeitumgebung, die allgemein als "Runner" bezeichnet wird. Zweitens ein SDK. Dieses SDK implementierte ein einheitliches Programmiermodell ("unified programming model"), das eine grösstmögliche Abstraktion der zugrundeliegenden Prozesse darstellte. So muss sich der Entwickler beispielsweise keine Gedanken darüber machen, wie er ein Zeitfenster robust implementiert. Er definiert einfach das gewünschte Zeitfenster mit dem SDK. Die ausführende Komponente, der Runner, kümmert sich um die entsprechende Implementierung. Google Dataflow hatte zu Entwicklungs- und Testzwecken einen lokalen Runner, also eine vereinfachte Laufzeitumgebung, mit der ein Entwickler das Verhalten seiner Anwendung schnell lokal testen konnte. Google entschied sich dann, im Einklang mit seiner Philosophie, das SDK und den lokalen Runner von Dataflow als Open Source Software an die Apache Foundation zu übergeben. Dies war die Geburtsstunde von Apache Beam. Das GCP-Produkt Dataflow ist nun eine vollständig verwaltete, robuste und hoch skalierbare Ausführungsplattform für Streaming-Anwendungen, die mit Apache Beam entwickelt wurden. Nachdem das SDK Open-Soruce wurde, haben auch andere Laufzeitumgebungen für Streaming-Anwendungen Apache Beam-kompatible "Runner" implementiert. Damit kann dieselbe Anwendung nicht nur in Dataflow, sondern beispielsweise auch in Apache Spark oder Apache Flink ausgeführt werden. Ich möchte an dieser Stelle nicht weiter ins Detail gehen. Nur so viel sei gesagt: Es gibt Unterschiede in der Implementierung, die sich auf das Verhalten der Anwendung auswirken können. Bevor wir einen Blick auf das Abstraktionsmodell von Apache Beam werfen, muss ich noch etwas klarstellen. Bisher war immer von Streaming-Anwendungen die Rede und ich habe eingangs erläutert, dass es einen Unterschied zur klassischen Batch-Verarbeitung gibt. Das ist immer noch richtig. Wenn wir verstehen, wie Streaming-Anwendungen funktionieren, können wir uns aber auch vorstellen, dass eine Batch-Verarbeitung einfach eine spezielle Variante einer Streaming-Anwendung ist. Einfach eine, die klarer und vorhersehbarer ist. Und genau so wurde Apache Beam entwickelt. Man kann damit Batch- und Streaming-Anwendungen auf die gleiche Art und Weise implementieren. Im Umfeld von Apache Beam spricht man in diesem Zusammenhang oft von "bounded" und "unbounded data". Dabei sind "bounded data" Batch-Verarbeitungen und "unbounded data" Stream-Verarbeitungen. Das hat den grossen Vorteil, dass man die gleiche Technologie verwenden kann. Ich kenne das so, dass man in einem Produkt oder auch in einem Unternehmen in der Regel mehr Batch-Prozesse hat als echte Streaming-Anwendungen. Wenn man diese in Apache Beam implementiert, hat man genug Know-how für die selteneren echten Streaming-Anwendungen.

Das Programmiermodell von Apache Beam besteht im Grunde aus wenigen einfachen wichtigen Komponenten/Objekten (nicht abschliessend):

Programmfluss

Pipeline:
Die Pipeline ist das zentrale Objekt, das die gesamte Datenverarbeitung beschreibt. Sie enthält alle PCollections und PTransforms und definiert damit den Ablauf der Datenverarbeitung, vom Eingang der Daten bis zum Abschluss.

PCollection:
PCollection steht für "Parallel Collection". Die PCollection ist eine generische Datenstruktur im Apache Beam Programmiermodell, die eine verteilte, unveränderliche Sammlung von Datenrepräsentationen darstellt. Eine PCollection kann eine beliebige Anzahl von Elementen beliebiger Datentypen enthalten. PCollections werden als Eingaben an die Transformationsschritte (PTransforms) übergeben und jede PTransform hat wiederum mindestens eine PCollection als Rückgabewert. Da diese unveränderlich sind, wird bei Änderungen während eines Transformationsschrittes ein neues Objekt der PCollection erzeugt.

PTransform:
PTransform steht für "Parallel Transformation" und ist eine Abstraktion, die eine Datenverarbeitungsoperation auf einer oder mehreren PCollections darstellt. PTransforms beschreiben die Verarbeitungslogik und können von einfachen Operationen wie Filtern und Mappen bis hin zu komplexeren Aggregationen reichen. PTransforms können auf verschiedene Weise erstellt werden: Durch vorgefertigter Standardtransformationen wie GroupByKey, Combine usw. und mittels benutzerdefinierter Transformationen. Jede PTransform nimmt eine oder mehrere PCollections als Eingabe (Input) und erzeugt eine oder mehrere PCollections als Ausgabe (Rückgabewert). PTransform-Objekte definieren also, wie Daten der PCollection(s) transformiert werden.

Einstellungen/Konfigurationen

Window:
Windows definieren Zeitintervalle oder andere Kriterien, nach denen Elemente in einer PCollection gruppiert werden. Es gibt verschiedene Standard Windowing-Strategien: Fixed Windows, Sliding Windows oder Session Windows. Es ist möglich, eigene Strategien zu implementieren.
Fixed Windows:
Die Fenster lösen sich nahtlos ab, es gibt keine Überlappung. Wenn das erste Fenster endet, beginnt das nächste Fenster. Man definiert einfach die Zeitdauer, bspw. 5 Minuten.
Sliding Windows:
Die Fenster überlappen sich in der Regel, je nach Konfiguration. Man definiert die Zeitdauer und das Intervall. Bspw. 5 Minuten und 1 Minute. Alle Minute wird ein Fenster geöffnet, das 5 Minuten dauert. Damit wird ein Element, das bei 4 Minuten auftaucht, in den Fenstern 1, 2, 3 und 4 enthalten sein. Sie haben womöglich bereits festgestellt, dass das Konzept von "Exactly one matters" hier eine andere Bedeutung hat, als Sie vielleicht beim ersten Lesen des Konzeptes angenommen haben. Exakt einmal bezieht sich auf die Fenster. Wann braucht man diese Strategie? Die Strategie ist vor allem in Szenarien sinnvoll, wo man Trends berechnen will. Da möchte man vielleicht jede Minute einen Trend der letzten 5 Minuten ausgeben.
Session Windows:
Session Windows definieren eine Gruppierung und eine Inaktivitätsdauer. Die Fenstergrösse ist damit variabel. Session Windows eigenen sich, wenn man eine logische Einheit von Daten zusammenfassen will. Ein typisches Beispiel ist eine Benutzerinteraktion auf einer Web-Applikation. Man möchte das Verhalten analysieren und dabei aber jeweils nur Aktionen als Einheit erfassen, welche zeitlich eng zusammen hängen. Klickt der Benutzer also bspw. während 5 Minuten nicht mehr, schliesst die Session und die nächsten Aktionen des Benutzers werden in einer neuen Session analysiert.

Watermark:
Im Deutschen ist es meiner Erfahrung nach wichtig, die richtige Übersetzung für diesen Begriff zu verwenden. Assoziieren Sie Watermark nicht mit dem deutschen Begriff "Wasserzeichen", sondern mit den Begriffen "Wasserstand" oder "Pegelstand". Mit einem Wasserzeichen auf Banknoten oder Briefpapier hat der Begriff hier wenig zu tun. Watermark ist eine zentrale Komponente des Apache Beam zugrundeliegenden Konzepts. Watermarks bieten eine Balance zwischen Latenz und Vollständigkeit der Datenverarbeitung. Dabei ist ein Watermark eigentlich kein fester Pegel. Vielmehr gibt es einen Schätzwert an. Eine Schätzung des Zeitpunkts T, zu dem alle erwarteten Daten eingetroffen sein werden. Sie erinnern sich, dass bei einer Batch-Verarbeitung klar ist, wann alle Elemente eingetroffen sind, aber nicht bei einem Stream. Schauen Sie sich Abbildung 4 noch einmal an. Woher weiss der Stream-Prozess, dass nach "B" um 10:32 Uhr noch ein "C" um 10:30 Uhr eintreffen wird? Ganz einfach. Es ist eine Schätzung, genannt Watermark, wann - Zeitpunkt T - alle Elemente des ersten Fensters angekommen sein werden. Es ist u.U., aber nicht in jedem Fall, wichtig, eine Balance zu finden zwischen einer kurzen Latenz, d.h. der Notwendigkeit, die Ergebnisse so schnell wie möglich zu liefern und das Fenster so schnell wie möglich zu schliessen, und der Notwendigkeit der Vollständigkeit, d.h. möglichst alle zugehörigen Elemente zu berücksichtigen. Es würde zu weit führen, auf die Berechnung von Watermarks im Detail einzugehen und der Vorteil ist, dass sich ein Entwickler in der Regel nicht mit der Implementierung beschäftigen muss. Im Prinzip handelt es sich um Heuristiken, die auf der Grundlage verschiedener Daten der Quelle berechnet werden. Apache Beam berechnet streng genommen keine Watermarks. Das macht der Runner. Dies ist übrigens einer der Unterschiede zwischen verschiedenen Runnern wie Apache Spark, Apache Flink und Dataflow. Nicht jeder dieser Runner berechnet die Watermarks für jede Quelle gleich. Daher kann es trotz identischem Apache Beam Quellcode zu unterschiedlichen Ergebnissen kommen. Das Dataflow-Team von Google hat beispielsweise versucht, eine optimierte Watermark-Berechnung für die Quelle Pub/Sub zu implementieren.

Trigger:
Trigger bestimmen, wann die gesammelten Elemente in einem Fenster weiterverarbeitet und die Ergebnisse emittiert werden, das Fenster also geschlossen wird. Es gibt vier Standard-Trigger:
AfterWatermark:
Wird gefeuert, sobald der Zeitpunkt gemäss Watermark erreicht worden ist, an dem alle Ereignisse eingetroffen sein sollten. Das ist der häufigste Trigger für zeitgesteuerte Fenster, insb. wenn es eine Balance zwischen Latenz und Vollständigkeit geben soll.

AfterProcessingTime:
Wird gefeuert basierend auf der Verarbeitungszeit nach einer bestimmten Verzögerung. Es kommt also nicht die Watermark zum Zug. Als Zeit gilt der Zeitpunkt, wo die Ereignisse eintreffen. Ein solcher Trigger von 5 Minuten, wird bspw. 5 Minuten nach dem Eintreffen des ersten Ereignisses gefeuert. Dieser Trigger ist dann nützlich, wenn die Verarbeitungszeit (Latenz) höher gewichtet wird als die Vollständigkeit. Egal ob noch Ereignisse eintreffen, nach 5 Minuten (in unserem Beispiel) wird das Fenster geschlossen, später eintreffende Ereignisse werden in einem neuen Fenster behandelt.

AfterPaneElementCount:
Wird gefeuert, wenn eine bestimmte Anzahl von Elementen in einem Fenster gesammelt wurden. Die Zeit spielt bei diesem Trigger also keine Rolle. Bei einem solchen Trigger von 10, wird gefeuert, nach dem 10 Ereignisse eingetroffen sind. Dieser Trigger ist hilfreich, wenn man jeweils eine fixe Anzahl von Ereignissen zusammen verarbeiten will.

Repeatedly:
Dieser Trigger wiederholt einen zu Grunde liegenden Trigger und feuert entsprechend, wenn der zu Grunde liegende Trigger feuert, nach einer definierten Frist ("delay"). Angenommen, wir haben einen AfterProcessingTime-Trigger von 5 Minuten. Dieser feuert 5 Minuten nach dem Eintreffen des ersten Ereignisses. Mit dem Repeatedly-Trigger können wir nun diesen Trigger bspw. alle 5 Minuten ("delay = 5) wiederholen. Also 5 Minuten nach dem ersten feuern wird der Trigger erneut ausgelöst. Dieser Trigger kann nützlich sein, wenn man regelmässige Aktualisierungen braucht, bspw. in Realtime Dashboards.

Timestamp:
Zeitstempel sind den Elementen in einer PCollection zugeordnet und repräsentieren den Zeitpunkt, zu dem das Ereignis aufgetreten ist.

Programm-Logik/Datenmanipulation
GroupByKey:
GroupByKey gruppiert Elemente einer PCollection nach Schlüssel und erzeugt eine neue PCollection, in der alle Werte mit demselben Schlüssel zusammengefasst sind.

Combine:
Combine ist eine Aggregation auf einer PCollection. Damit können Standard-Aggregationen wie Summe oder arithmetisches Mittel oder benutzerdefinierte Aggregationen durchgeführt werden. Das Ergebnis ist eine neue PCollection.

Flatten:
Flatten kombiniert mehrere PCollections zu einer einzigen PCollection. Damit kann man bspw. mehrere eingehende PCollections in eine zusammenführen und als Output einem nächsten Schritt übergeben, welcher dann bspw. darauf eine Aggregrations-Funktion (Combine) ausführt.

Partition:
Partition ist das Gegenteil von Flatten und teilt eine PCollection in mehrere PCollections auf.

Filter:
Wie es der Name vermuten lässt, kann man mit dieser PTransform einzelne Elemente aus einer PCollection herausfiltern.

Read/Write:
Der Name ist Programm. Read und Write sind PTransforms, die zum Lesen von Daten aus einer Quelle resp. zum Schreiben von Daten in ein Ziel verwendet werden können.

ParDo:
ParDo ist eine PTransform, die auf jedes Element einer PCollection eine benutzerdefinierte Verarbeitung anwendet. Wie der Name es vermuten lässt ("Par" steht für Parallel), ermöglicht sie die parallele Verarbeitung von Daten. Die benutzerdefinierte Verarbeitung wird dabei in einer DoFn definiert.

DoFn:
DoFn steht für "Do Function". In einer DoFn implementiert man eine benutzerdefinierte Verarbeitung eines Elements einer PCollection. Die DoFn wird dabei von der ParDo aufgerufen, welche die DoFn auf jedes Element der PCollection anwendet.

Externe Bibliotheken:
Es gibt zahlreiche Bibliotheken von Dritten, welche Apache Beam Schnittstellen implementieren und damit Funktionen bereitstellen. Beispiele sind der Pub/Sub- oder BigQuery-Reader von Google, mit denen man einfach Pub/Sub-Messages oder BigQuery-Abfragen als Ereignis-Quellen für eine Apache Beam Pipeline nutzen kann.

Ein Bild sagt mehr als Tausend Worte. Schauen wir uns die schriftliche Theorie des Modells einmal in einer vereinfachten grafischen Darstellung an:

Abb. 6: Vereinfachte Darstellung einer Pipeline mit PCollections und PTransforms.

Was auffällt: Wir haben eine Pipeline und PCollection und PTransform. Aber wo sind Window, Watermark und Trigger? Oder GroupyByKey, ParDo etc.? Window, Watermark (falls verwendet) und Trigger stehen am Anfang bei/nach "Create". Sie definieren, welche konkreten Artikel in der PCollection in die jeweilige Instanz der Pipeline kommen. Die eigentliche Pipeline ab "Artikel" wird, je nach Konfiguration und Daten, parallel ausgeführt. Schauen wir uns das auch grafisch an:

Abb. 7: Window, Trigger und Watermark steuern den "Start" der Pipeline-Instanz

GroupByKey, Combine, Flatten, Filter etc. sind Standard PTransforms (grüne Kästchen), die einfach konfigurativ in der Pipeline verwendet werden können. In unserem Fall könnte man bspw. die Wörter der PCollection in einem Schritt mit GroupyByKey gruppieren. ParDo mit einer DoFn könnte man einsetzen, wenn man die einzelnen Wörter normalisieren möchte (Stichworte Stemmer, Gross-/Kleinschreibung etc.).

Eine Frage haben wir bisher noch gekonnt ignoriert. Was passiert, wenn "C" um 10.30 Uhr zu spät kommt und trotz gutem Watermark das Fenster bereits geschlossen ist und die PCollection dieses Zeitraums bereits losgeschickt wurde? Für dieses Problem gibt es in Apache Beam grundsätzlich zwei Lösungsmöglichkeiten. Die erste ist die Option "allowedLateness" als Window-Eigenschaft. Mit dieser kann man definieren, wie lange verspätete Ereignisse akzeptiert werden. Diese kommen dann zusätzlich in das nächste, eigentlich falsche Window. Mit einer Einstellung von 2 Minuten, werden also auch bis um 2 Minuten verspätete Ereignisse noch verarbeitet. Spätere werden ignoriert. Die zweite Option ist häufig die besser und entsprechend auch die häufiger genutzte: Die Aufzählung der "Trigger" weiter oben ist nicht vollständig. Es gibt auch noch einen "Late Trigger". Dieser wird für alle zu späten Ereignisse gefeuert und man kann definieren, wann, resp. wie oft der dafür ausgeführt werden soll (bspw. nach Zeit oder Anzahl solcher Ereignisse). Damit hat man die Möglichkeit, diese zu späten Ereignisse gesondert zu behandeln. Die beiden Optionen können auch kombiniert werden.

Nun haben wir bald alles beisammen, um uns ein erstes Pipeline-Programm anzusehen. Zuvor müssen wir uns aber noch kurz grundsätzlich mit dem Apache Beam SDK beschäftigen. An dieser Stelle sei darauf hingewiesen, dass die Einführung in Apache Beam in diesem Artikel nur einen groben Überblick darstellt und als Motivation gedacht ist, sich dieses Framework genauer anzusehen. Im Rahmen dieses Artikels können wir nur an der Oberfläche kratzen. Mit "wir" bin vor allem ich als Autor gemeint. Zurück zum SDK. In welcher Programmiersprache programmiert man eigentlich Apache Beam? Das Apache Projekt Apache Beam unterhält offiziell die SDKs für folgende Programmiersprachen:
  • Java
  • Python
  • Go
  • TypeScript
Dazu gibt es ein offizielles SDK von Spotify für die Programmiersprache Scala: https://spotify.github.io/scio/ 

Weiter bietet das Apache Beam Projekt ein YAML-Interface an. Damit lassen sich Apache Beam Pipelines rein deklarativ, ohne Programmierung erstellen. Das eignet sich vor allem für einfache standardisierte Pipelines, die mit den Standard Transformatoren auskommen und keine eigene Logik brauchen. Die Dokumentation der Apache Beam SDKs finden Sie hier: https://beam.apache.org/documentation/sdks/java/ 

Bleibt die Frage, in welcher Sprache man Apache Beam Anwendungen am besten schreibt. Meiner Meinung nach in der Sprache, die man am besten kennt, bzw. die am häufigsten für ähnliche Zwecke verwendet wird. Ich schreibe die folgenden Beispiele in Python, da ich der Meinung bin, dass der Apache Beam Quellcode in Python am intuitivsten zu lesen ist.

Schauen wir uns eine konkrete Pipeline in Python an. Wir konzentrieren uns hier direkt auf die Implementierung der Pipeline. Den Rumpf eines kompletten Programms lassen wir aussen vor. Python lässt viel Spielraum, wie man das "Drumherum" mit Konfiguration, Funktionen, Auslagerung in eigene Klassen etc. gestalten will. Vollständige Codebeispiele gibt es viele. Die offiziellen finden Sie hier: https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples 

Abb. 8: Code-Ausschnitt einer einfachen Apache Beam Pipeline in Python


Bevor wir uns den Code der Reihe nach ansehen, die Erklärung zu den wichtigsten zwei überschriebenen Python-Operatoren: | und >>. Diese erlauben es, dass eine Pipeline in Python sehr gut lesbar ist. Der bitweise nicht ausschliessende ODER Operator ("|") wird im Apache Beam SDK für die Verkettung genutzt. Der Operator für eine bitweise Verschiebung nach Rechts (">>") wird verwendet, um den Namen/Titel eines Schrittes mit dessen Implementierung zu verbinden. Ein Java/Python Vergleich macht dies ggf. einfacher verständlich:

Java: p.apply("Name/Titel", PTransform...)
Python: p | "Name/Titel" >> PTransform...

Orientieren wir uns zuerst grob im Code anhand der bekannten grafischen Darstellung:

Abb. 9: Mapping Code zu grafischer Darstellung

  1. bean.Pipeline p öffnet die Pipeline-Definition (gelber Bereich)
  2. ReadFromPubSub ist das "Create", die Quelle der Daten
  3. WindowInto die die Fenster-Funktion
  4. FlatMap als erster PTransformer der eigentlichen Bussiness-Logik
  5. Map hier als zweiter PTransfomer der Business-Logik
Die Pipeline im Code kann übersichtlich im Fluss von oben nach unten definiert werden. Schauen wir uns die einzelnen PTransformer-Funktionen genauer an:

Artikel dekodieren:
Mit dieser Map-Funktion werden die Texte aus Pub/Sub in utf-8 kodiert. Es ist wichtig eine definierte Kodierung sicherzustellen, da wir im Anschluss String-Operationen (Regex) darauf anwenden. Dies ist ein rein technischer Schritt.

In fixe Zeitfenster aggregieren:
Mit WindoInto definieren wir ein fixes Zeitfenster von einer Minute. Der Parameter sind Sekunden und mit 1 * 60 ist dies besser verständlich. Sollte man in einer echten Anwendung parametrisiert umsetzen und nicht hart programmieren.

Worte splitten:
Hier übergeben wir der FlatMap-Funktion eine anonyme Funktion, welche die Texte in einzelne Worte splittet. Dazu verwenden wir eine Regex-Funktion. Dabei werden alle Worte ohne Sonderzeichen extrahiert. In diesem, oder einem zusätzlichen, Schritt könnten weitere Normalisierungen vorgenommen werden. Stichworte sind Stemmer oder Gross-/Kleinschreibung.

Paare bilden:
Hier bilden wir Paare mit der Map-Funktion als Vorbereitung für die spätere Zählung der Wörter (das Wort ist der Schlüssel).

Gruppieren nach Worten uns summieren:
Mit der CombinePerKey-Funktion findet eine Aggregation auf den Schlüssel (in unserem Fall das Wort) statt und die Werte werden summiert. Damit erhalten wir die Anzahl pro Wort.

Ergebnis formatieren:
Diese Map-Funktion zur Formatierung dient lediglich zur schönen Darstellung.

Ergebnis kodieren:
Pub/Sub, resp. die Funktion die wir verwenden, erwartet Bytes, wir haben intern String zur Verarbeitung. Wir stellen erneut utf-8 sicher und geben das Resultat dann vom Typ Byte zurück.

Ergebnis in Pub/Sub publizieren:
Schliesslich publizieren wir das formatierte Ergebnis wieder in Pub/Sub.

Soweit so klar. Sie Fragen sich, wo denn nun der Trigger, das Watermark oder all die PCollections im Code zu finden sind? Berechtigte Fragen. Trigger: Wenn wir ein FixedWindow nutzen, brauchen wir keinen expliziten Trigger. Der Trigger ist implizit und wird nach definierter Frist, in unserem Fall alle 60 Sekunden, ausgelöst. Watermark: Da wir ein FixedWindow haben und sich der Timestamp auf den Eintritt in die Pipeline bezieht (wir haben keinen externen Timestamp definiert), brauchen wir kein Watermark. Wir könnten in unserem Beispiel ein Watermark als Trigger verwenden und dabei auch gleich verspätete Daten akzeptieren. Das könnte folgendermassen umgesetzt werden:

Abb. 10: Beispiel mit Watermark als Trigger und allowed lateness

Im Beispiel oben wurde anstelle eines FixedWindows ein GlobalWindows verwendet und als expliziten Trigger AfterWatermark. Der zuletzt verwendete Accumuation-Mode dient dem Aufräumen.

PCollections werden implizit übergeben. Die Funktion beam.Map(lambda x: (x, 1)) gibt eine PCollection als Rückgabewert. Der Runner übergibt diese für uns der nachfolgenden Funktion beam.CombinePerKey(sum). Diese Übergabe müssen wir nicht explizit programmieren.

Damit sind wir am Schluss der Einführung von Streaming-Applikationen im allgemeinen und den Grundzügen von Apache Beam als Unified Programming Model zur Entwicklung von Streaming-Applikationen angekommen.

Effizient und skalierbar ausführen mit Dataflow

Eine Apache Beam-Anwendung muss von einem Runner ausgeführt werden. Im Apache Beam SDK gibt es dafür einen lokalen Runner, der aber nicht für den produktiven Einsatz gedacht ist. Ein sicherer, skalierbarer und verwalteter Runner wird von Google mit Dataflow zur Verfügung gestellt. Eine Apache Beam-Anwendung in Dataflow zu deployen ist denkbar einfach und nicht wesentlich aufwändiger als das lokale ausführen:

Abb. 11: Apache Beam Bereitstellung auf Dataflow

Dies ist ein Beispiel für Python. Stellen Sie sicher, dass das Apache Beam SDK installiert ist. Anschliessend können Sie die Breitstellung direkt ausführen.

Es versteht sich von selbst, dass je nach Anwendung und Landingzone noch Vorarbeiten notwendig sind. Verwenden Sie Pub/Sub, müssen Sie ggf. noch die entsprechenden Topics einrichten. Dazu werden Sie, falls nicht bereits aktiviert, zur Aktivierung von zahlreichen GCP APIs aufgefordert. Eine vollständige Anleitung dieser Schritte finden Sie bspw. hier: https://cloud.google.com/dataflow/docs/quickstarts/create-pipeline-python?hl=de

Realtime Transaktionsüberprüfung als Stream

Die Banken sind zumindest in der Schweiz u.a. aufgrund des Embargogesetzes und seiner Verordnungen faktisch verpflichtet, die Absender und Empfänger von Zahlungen gegen die "Sanktionslisten" zu überprüfen. Je nach Risikoeinschätzung werden solche Prüfungen gegen weitere Personen- und Entitätsdatenbanken durchgeführt. Diese Prüfungen sollten vor der eigentlichen Ausführung erfolgen. Ist die Zahlung erst einmal ausgeführt, kann bereits ein Verstoss vorliegen. Technisch bedeutet dies, dass die Transaktionsprüfung "in Echtzeit" erfolgen muss. Diese Anforderung kann durch verschiedene Architekturmuster erfüllt werden. Jedes hat seine Vor- und Nachteile, die es zu gewichten und abzuwägen gilt. Ich möchte zunächst 3 verschiedene Architekturmuster kurz vorstellen und auf deren Vor- und Nachteile eingehen.

Synchron in einem Schritt:
Bei diesem Modell ruft die eigentliche Zahlungsverkehrsfunktion die Name-Matching-Funktion synchron in ihrem eigenen Prozess auf. Sie wartet auf das Ergebnis und setzt dann die Verarbeitung fort.

Vorteile:
Dieses Pattern hat den Vorteil, dass die Prozesslogik an einer Stelle liegt und die Funktion relativ einfach getestet werden kann. Mit Unit-Tests gegen einen Mock-Service kann die Funktion bereits recht umfassend getestet werden.

Nachteile:
Die Nachteile dieses Patterns liegen vor allem in der Performance, allenfalls in den Kosten und der Wartbarkeit. Performance: Die Zahlungsverkehrsanwendung muss auf die Antwort warten. Dies führt dazu, dass bei vielen Zahlungen wesentlich mehr Instanzen der Funktion vorhanden sein müssen. Dies erfordert mehr RAM und CPU oder es besteht die Gefahr, dass die gesamte Applikation langsamer wird. Kosten: Je nach Implementierung bzw. Betrieb der Name-Matching-Funktion können höhere Kosten anfallen. Insbesondere wenn die Funktion mit einem Modell wie Cloud Functions betrieben wird, wo die Anzahl Aufrufe einen direkten Einfluss auf die Kosten hat. Wartbarkeit: Die Zahlungsverkehrsfunktion muss alle Fehlerbehandlungen, Re-Try-Mechanismen etc. selbst implementieren. Dies macht die Anwendung zu einem Monolithen, der mit der Zeit aufwendiger zu warten ist und die Komplexität bei Änderungen erhöht.

Entkopplung mit Pub/Sub:
In diesem Pattern wird der Prozess entkoppelt. Die Zahlungsverkehrsanwendung führt die bisherigen Prüfungen wie Bonitätsprüfung etc. durch. An der Stelle der Namensprüfung speichert sie den Status und veröffentlicht die Zahlung in Pub/Sub. Dies triggert die Name-Matching-Funktion, welche die Prüfung durchführt, den Status speichert und im positiven Fall via Pub/Sub die Zahlungsverkehrsanwendung zur Ausführung der Zahlung triggert.

Vorteile:
Mit diesem Pattern halten wir die Zahlungsverkehrsanwendung schlank und leicht wartbar. Die Performance bleibt praktisch unverändert, das Publizieren in Pub/Sub benötigt wenig Ressourcen. Während der Name-Matching-Prüfung kann die Zahlungsverkehrsanwendung ihre Ressourcen für andere Zahlungen nutzen.

Nachteile:
Der mögliche Kostennachteil des ersten Patterns, da jede Prüfung einzeln ausgeführt wird, bleibt bestehen. Im Vergleich zum ersten Pattern hat diese Lösung einen höheren Testaufwand. Der Aufbau der Testumgebung ist etwas höher, die Unit-Tests sind auf kleinere Units beschränkt, die Aussagekraft ist entsprechend geringer.

Transaktionsprüfung als Stream:
In diesem Pattern wird die Transaktionsprüfung als Stream betrachtet. Es kommen kontinuierlich Zahlungen an, die geprüft werden müssen. Die Zahlungsanwendung publiziert die Zahlung wie im vorherigen Pattern in Pub/Sub. Der Consumer dieses Pub/Sub-Topics ist nun aber keine Funktion, sondern eine Stream-Anwendung. Diese verarbeitet nun die Zahlungen in definierten Fenstern. Am besten in festen Grössen, z.B. immer 10 Transaktionen zusammen und einer maximalen Zeitspanne. Also nach 10 Transaktionen oder nach 30 Sekunden, was zuerst eintrifft. Je nach Möglichkeiten der Zahlungsverkehrsapplikation triggert die Stream-Applikation die Zahlungsverkehrsapplikation pro Zahlung oder im Batch via Pub/Sub zur Ausführung an.

Vorteile:
Mit diesem Pattern können wir die Name-Matching-Funktion in Batches aufrufen. So können wir z.B. 10 Zahlungen auf einmal prüfen lassen. Dies reduziert die Netzwerklast und die Anzahl der Funktionsaufrufe. Die Vorteile des vorherigen Patterns bleiben erhalten.

Nachteile:
Analog zur vorherigen Lösung ist der Testaufwand bei dieser Variante im Vergleich zur ersten Lösung höher, die Aussagekraft der einzelnen Unit-Tests ist eingeschränkter.

Das Rüstzeug für eine entsprechende Apache Beam-Anwendung haben wir eigentlich fast. Das einzige, was noch fehlt, ist die kombinierte Fensterfunktion. Wir wollen immer 10 Transaktionen zusammenfassen. Wenn es aber einmal nicht so viele Transaktionen gibt, wollen wir nicht lange warten, die Transaktionen sollen ja schnell ausgeführt werden. Deshalb haben wir die zweite Bedingung definiert, maximal 30 Sekunden, nachdem die erste Transaktion im neuen Fenster eingetroffen ist. Das bedeutet, dass der Start einer Prüfung im schlimmsten Fall 30 Sekunden dauert, wenn danach nicht mehr als 8 weitere Transaktionen eintreffen. Dazu können wir das bekannte globale Fenster mit eigenen Triggern verwenden. Dabei können wir mit "AfterAny" verschiedene Trigger kombinieren, die logisch oder ("OR") verknüpft werden. Unsere Window-Definition könnte entsprechend wie folgt aussehen:

Abb. 12: Kombiniertes Fenster mit Repeatedly und AfterAny

Kommen wir zum Name-Matching Teil. In einer Zahlung haben wir in der Regel vom Empfänger keinen strukturieren Namen. Das heisst, wir haben oftmals eine komplette Adresse in einem Feld. Dabei können durchaus auch mehrere Personen enthalten sein. Bspw.:

Herr und Frau
Hans u. Erika Müller-Meier
Musterstrasse 24
8001 Zürich
Schweiz

Dabei können wir uns nicht auf eine wohl strukturierte vollständige Adresse verlassen. Die könnte auch so aussehen:

Herr Hans u. Erika Müller-Meier
Mustergasse 24, CH-8001 Zürich

Da kommt die ESC-Funktion "findByAddress" ins Spiel. Dieser Funktion können wir eine komplette Adresse als String übergeben um unscharf nach Namen im Index zu suchen. Um die Struktur der Adresse brauchen wir uns dabei nicht zu kümmern.

Für die Prüfung per Batch können wir unsere Beispiel-Anwendung aus diesem Cloud Serie-Artikel: ESC Index-Applikation serverless mit Cloud Run betreiben um einen Endpoint erweitern, welcher als Input eine Liste von Strings (Adressen) mit zugehörigen Transaktionsnummern entgegennimmt. Diese alle prüft und das Resultat zurück liefert:

Abb. 13: Code-Erweiterung um einen neuen Rest-API Endpoint - Eine Liste von Adressen überprüfen

Abb. 14: Lokale Abfrage des neuen Rest-API Endpoint


Mit diesem Praxisbeispiel schliesse ich hiermit die Kurzeinführung in Streaming-Applikationen mit Apache Beam und deren Ausführung in Dataflow ab. Ich hoffe, Sie konnten die eine oder anderen Architektur-Inspiration mitnehmen.

Weiterführende Links:





Freitag, 17. Mai 2024

Cloud Serie: Das GCP Serverless Trio in der Übersicht - mit etwas GKE

In den letzten drei Artikeln in dieser Cloud Serie habe ich drei verschiedene Möglichkeiten der Google Cloud (GCP) vorgestellt, mit denen sich serverless Programme ausführen lassen:
Jeweils erläutert anhand einer Beispiel-Applikation basierend auf der ESC-Bibliothek. In diesem Artikel fasse ich diese 3 Möglichkeiten nochmals kurz zusammen und gebe in einer Gegenüberstellung Hinweise, welches Werkzeug sich für welchen Einsatz am besten eignen könnte.

Cloud Functions
Vorweg: Das Produkt heisst offiziell "Cloud Functions". Ich schreibe es aber auch nicht konsequent in der Mehrzahl. Wenn es nur eine Funktion ist, schreibe ich häufig "Cloud Function". Mit Cloud Functions können einzelne Funktionen zur Verfügung gestellt werden. Dazu gibt es zwei Möglichkeiten:
  • HTTP-Funktion: Aufrufbar via URL (i.d.R. REST-API)
  • Ereignisgesteuerte Funktionen: Sind von folgenden GCP-Ereignissen aufrufbar: Pub/Sub-Trigger, Cloud Storage Trigger, Firestore Trigger, EventArc-Trigger


Unterstützte Sprachen:
Cloud Functions können in einer der folgenden Programmiersprachen implementiert werden:
  • JavaScript/Node.js
  • Python
  • Go
  • Java/JVM-Sprachen (bspw. Scala, Kotlin)
  • C#
  • Ruby
  • PHP

App Engine
Mit App Engine können komplette Applikationen, also nicht nur einzelne Funktionen, bereitgestellt werden. Dies erfolgt in einem pro Sprache definierten Standardcontainer. Dabei gibt es jedoch zwei Ausprägungen. Man kann zwischen der Standard- und der Flexibler-Umgebung wählen. Einen Vergleich der beiden gibt es hier: https://cloud.google.com/appengine/docs/the-appengine-environments?hl=de


Unterstützte Sprachen:
App Engine Applikationen können in einer der folgenden Programmiersprachen implementiert werden:
  • JavaScript/Node.js
  • Python
  • Go
  • Java/JVM-Sprachen (bspw. Scala, Kotlin)
  • C#
  • Ruby
  • PHP

Cloud Run
Cloud Run ist eine vollständig verwaltete Container-Plattform. Mit Cloud Run kann man sehr einfach Applikationen als Docker-Image deployen.

Offizielle GCP-Präsenzhttps://cloud.google.com/run?hl=de

Unterschiede zu App Engine folgen gleich im nächsten Teil. An dieser Stelle möchte ich GKE als mögliche Alternative erwähnen. Wann braucht man GKE anstelle von Cloud Run? GKE ist für viele Anwendungen und gar für viele Organisationen mit Kanonen auf Spatzen geschossen. Cloud Run hat seine Limitierungen vor allem da, wo man eine Microservice-Architektur für eine Anwendung mit sehr vielen einzelnen Services hat, welche enge Abhängigkeiten untereinander haben. Solche einzeln via Cloud Run orchestrierst zu deployen und zu managen kann aufwändig sein. Dazu gehört auch das Monitoring, welches in GKE für solche Szenarien einfacher "out-of-the-box" feingranularer möglich ist. Wie bei App Engine mit Standard und Flexibel, gibt es auch bei GKE mehrere Ausprägungen. Standard und Premium und dann vor allem "normal" (um nicht wieder Standard zu verwenden) und Autopilot. Letzterer vereinfacht das Management und reduziert die Aussage der Kanonen und Spatzen verstärkt auf die Kostensicht. App Engine Flexibel, Cloud Run und GKE Autopilot liegen nahe beisammen. Es versteht sich von selbst, dass letztlich bei allen drei Produkten wiederum Kubernetes die technische Basis ist. Selbst die neuen Cloud Functions sind letztlich nichts anderes als Container. Man sieht diese Cloud Functions (2nd Generation) in der Konsole auch unter Cloud Run. Dort steht bei "Deployed by" Cloud Functions. So schliesst sich der Kreis.

Gegenüberstellung
Die folgende Tabelle zeigt aus meiner Sicht relevante unterschiede zwischen den drei Produkte auf.

Vergleich von Cloud Functions, App Engine und Cloud Run - ohne Gewähr

Bei Standard App Engine wählt man eine Maschine aus einer Klasse aus, die Leistung ist eher bescheiden. Epp Engine Flexibel ist dagegen ein Biest. Die Frage ist, ob es nicht Software architektonische Mängel gibt, wenn eine Applikation auf einer Maschine laufen muss mit so viel Power. Ggf. wäre es sinnvoller, die Last auf mehrere Instanzen zu verteilen. App Engine ist generell bezüglich Netzwerk und Sicherheit "einfacher", da mehr direkt konfiguriert werden kann, während man bei Cloud Run diesbezüglich mit Load Balancer, Cloud Armor etc. ggf. zusätzliche Konfigurationen machen muss. Ein grosser Vorteil von Cloud Run ggü. App Engine ist aus meiner Sicht VPC, da man damit GCP Dienste einfacher einbinden/verwenden kann, also via App Engine.

Fazit
GCP bietet verschiedene Möglichkeiten, Applikationen serverless zu betreiben. Cloud Functions sind wahrscheinlich am einfachsten zu charakterisieren und zu wählen. Die Wahl zwischen App Engine (Flexibel) und Cloud Run ist nicht immer trivial, da werden feine Unterschiede den Ausschlag geben. In die Harmonie drei kommt dann noch GKE und insb. GKE Autopilot als Möglichkeit dazu. Das macht die Wahl zugegebener Massen nicht einfacher. Ich persönlich würde wann möglich nicht mit GKE starten, wenn nicht von Beginn an klar ist, dass es das braucht. Viele Szenarien lassen sich letztlich einfacher mit den drei hier vorgestellten Möglichkeiten umsetzen. Und gerade wenn man Cloud Run nutzt und damit bereits eigene Container-Definitionen hat, ist ein Wechsel auf GKE nicht so weit weg. Bei der Verwendung von GKE braucht man einfach bald weitere Technologien wie bspw. Helm, um bspw. effizient deployen zu können. Jede zusätzliche Technologie/Produkt braucht wiederum Know-how und Management und birgt Risiken. Das muss gut überlegt sein, ob damit die Vorteile insgesamt wirklich überwiegen oder (noch) nicht. In den meisten Fällen gibt es bei der Wahl kein Richtig oder Falsch. Neben den technischen Eigenschaften der Produkte muss bei der Wahl auch immer das eigene Know-how resp. das Know-how des Teams, der Firma berücksichtigt werden. Ebenso relevant ist die Menge. Vielleicht ist für ein Anwendungsfall Cloud Functions genau das Richtige. Wenn man aber nur eine einzige Cloud Functions hat, ist dies wohl organisatorisch nicht optimal und man entscheidet sich bspw. für Cloud Run, weil der Rest auch mit Cloud Run läuft. Dann überwiegt die Einfachheit des Tech- resp. Tool-Stack.