Dieser Artikel umfasst eine kurze Einführung in Apache Airflow für Neugierige und Einsteiger und zeigt im Anschluss auf, wie Apache Airflow mit Apache Beam kombiniert werden kann. Mehr Informationen zu Streaming-Applikationen, Apache Beam und Dataflow finden Sie in diesem Artikel hier.
In der heutigen digitalen Welt, in der verschiedene Funktionen und Anwendungen nahtlos ineinandergreifen, sind Workflow-Engines zu einem unverzichtbaren Werkzeug geworden. Nahezu jeder Geschäftsprozess, von der Datenverarbeitung über die durchgängige Integration bis hin zur Automatisierung von Aufgaben, kann als Abfolge von Schritten oder eben Workflows betrachtet werden.
Die Effizienz und Zuverlässigkeit dieser Workflows kann entscheidend für den Erfolg eines Unternehmens sein. Es gibt eine Vielzahl von Werkzeugen, die speziell für die Orchestrierung und Verwaltung dieser Workflows entwickelt wurden. Von einfachen Task Schedulern bis hin zu komplexen Workflow Management Systemen bieten sie die notwendige Infrastruktur, um Prozesse effizient zu definieren, auszuführen und zu überwachen.
Cloud Composer ist eine verwaltete Version von Apache Airflow, die von Google auf der Google Cloud Platform (GCP) angeboten wird. Es kombiniert die Stärken von Apache Airflow mit der Skalierbarkeit und Zuverlässigkeit der GCP. Damit können Unternehmen komplexe Workflows erstellen und ausführen, ohne sich um die Infrastruktur und deren Wartung kümmern zu müssen. Ein wesentlicher Vorteil des Einsatzes von Cloud Composer ist der Open-Source-Charakter von Apache Airflow. Durch den Einsatz einer Open-Source-Lösung reduzieren Unternehmen das Risiko eines Vendor-Lock-ins und behalten die Flexibilität, ihre Workflows bei Bedarf auf andere Plattformen zu migrieren. Gleichzeitig profitieren sie von der grossen Community und den kontinuierlichen Verbesserungen, die die Gemeinschaft in das Projekt einbringt. (Stichwort: Grösse der Airflow-Community)
Moderne Workflow-Systeme müssen eine Reihe von Anforderungen erfüllen, um den vielfältigen Bedürfnissen heutiger Produkte und Organisationen gerecht zu werden. Dazu zählen u.a. die Konfigurations- und Anpassungsfähigkeiten, aber auch nicht funktionale Anforderungen wie Fehlertoleranz und Überwachung. Verschaffen wir uns zunächst einen Überblick, indem wir das Profil von Apache Airflow im Vergleich zu anderen Werkzeugen in der dieser Kategorie schärfen:
![]() |
Abb. 1: Gegenüberstellung von bekannten Workflow-Lösungen |
Apache Airflow kann mit gutem Gewissen zu den umfassendsten Workflow-Systemen gezählt werden. Aber warum fehlen in der Übersicht Grössen wie SAP Signavio, Camunda oder Bizagi? Es ist wichtig zu verstehen, dass Apache Airflow keine BPMN-Software ist. Mit Apache Airflow programmieren Sie die Workflows in Python. BPMN-Software bieten die Möglichkeit, dass Personen ohne Programmierkenntnisse Workflows grafisch als BPMN-Prozess definieren können. Das bietet Vor- und Nachteile. Wenn Entwickler die Workflows definieren und diese über die normale Source-Code-Verwaltung verwalten und CD/CI-Pipeline bereitstellen, ist Apache Airflow auf jeden Fall eine gute Wahl. Wenn Sie wollen, dass Business Analysten ohne Programmierkenntnisse selber Prozesse implementieren können, werden Sie eine BPMN-Software brauchen. Es gibt aber auch hinsichtlich der Ausführung einen wichtigen Unterschied: Apache Airflow bietet zwar ein GUI an, aber nur für das Management der Workflows, nicht als Enduser-GUI für einzelne Schritte. Brauchen Sie für einzelne Schritte ein GUI, müssten Sie dies selber implementieren. Die meisten BPMN-Lösungen bieten out of the box GUI für Enduser an. Apache Airflow eignet sich eher weniger für Workflows, in denen es eine sogenannte Human Interaction braucht. Für Prozesse à la: "Benutzer A erfasst Daten, Benutzer B gibt sie frei" sind BPMN-Lösungen in der Regel die besser Wahl. Apache Airflow wurde für rein technische Workflows konzipiert. Wenn Sie nach wie vor, oder jetzt erst recht überzeugt sind, dass Apache Airflow das richtige für Ihr Vorhaben ist, wünsche ich viel Spass beim Weiterlesen.
Etwas Geschichte
Bevor wir uns anschauen, wie Apache Airflow funktioniert, darf eine Würdigung seiner ursprünglicher Entwickler an dieser Stelle nicht fehlen. Apache Airflow wurde ursprünglich von Airbnb entwickelt, um die internen Workflow-Prozesse des Unternehmens zu automatisieren und zu orchestrieren. Das Projekt begann im Jahr 2014, als Maxime Beauchemin, damals Dateningenieur bei Airbnb, ein flexibles und skalierbares Workflow-Management-System benötigte, um den wachsenden Anforderungen des Unternehmens gerecht zu werden. Airflow wurde innerhalb des Unternehmens schnell weiterentwickelt. Im Jahr 2016 beschloss Airbnb, das Projekt als Open-Source-Software freizugeben, um von der grösseren Entwicklergemeinschaft zu profitieren und damit die Weiterentwicklung zu fördern. Kurz darauf wurde Airflow ein Incubator-Projekt der Apache Software Foundation und erhielt 2019 den Status eines Top-Level-Projekts. Ich habe Maxime Beauchemin im Rahmen dieses Artikels leider erfolglos um ein Statement zur damaligen Wahl von DAG als Grundkonzept gebeten.
1, 2 oder 3?
Aktuell ist Apache Airflow bei der Hauptversion 2. Die Version 3 wurde kürzlich für das Frühjahr 2025 angekündigt:
Version 1: 28.03.2016 (Maxime Beauchemin stellt es erstmals auf GitHub)
Version 2: 18.12.2020 (erfolgt als Apache Airflow Projekt)
Version 3: Angekündigt für Frühjahr 2025
Für Apache Airflow 3 wurden zahlreiche Neuerungen angekündigt. Es befindet sich Stand heute noch in Entwicklung und konkrete Features können sich daher noch ändern. Für alle Sichtbar wird sicher die neue Benutzeroberfläche sein, welche Apache Airflow bekommen soll. Wichtig ist, dass Apache Airflow 3 nur noch Python-Code in der Version 3.9+ unterstützt. Stellen sie also sicher, dass Ihre Implementierung mit einer aktuellen Version von Python kompatibel ist. Mehr Informationen zur Version 3 gibt es bspw. hier.
Etwas Graphentheorie
Die Graphentheorie ist ein Teilgebiet der diskreten Mathematik, das sich mit der Untersuchung von Graphen beschäftigt. Ein Graph besteht aus einer Menge von Knoten (auch Ecken genannt) und einer Menge von Kanten, die diese Knoten paarweise verbinden. Formal lässt sich ein Graph G als ein Paar (V, E) darstellen, wobei V die Menge der Knoten und E die Menge der Kanten ist.
Graphen können gerichtet oder ungerichtet sein. In gerichteten Graphen haben die Kanten eine Orientierung, d.h. sie zeigen von einem Knoten zu einem anderen. In ungerichteten Graphen sind die Kanten symmetrisch.
Die Graphentheorie bietet ein breites Spektrum an Konzepten und Methoden zur Analyse von Graphen. Dazu gehört beispielsweise auch Isomorphismus (Graphenisomorphie). Zwei Graphen A und B heissen isomorph, wenn es eine bijektive Abbildung (eine eineindeutige Zuordnung) zwischen den Knotenmengen von A und B gibt, die die Kantenstruktur erhält. Es ist die Eigenschaft zweier Graphen, strukturell gleich zu sein.
![]() |
Abb. 2: Zwei inhaltlich gleiche (isomorphe) Graphen, trotz unterschiedlicher Darstellung (Quelle: Wikipedia) |
In Apache Airflow definiert man einen Workflow in einem sogenannten DAG. DAG steht für Directed Acyclic Graph. Es handelt sich also um einen gerichteten, azyklischen Graph. Azyklisch bedeutet, dass keine (gerichtete) Kante von einem Knoten X auf einen Knoten Y zeigt, von welchem man wiederum über Kanten zurück zu X gelangen kann.
Im Zusammenhang mit Apache Airflow ist die Kenntnis von Isomorphismus dahingehend hilfreich, als dass man in Apache Airflow Graphen auf verschiedene Arten definieren kann und grafisch dargestellt sehen diese dann unter Umständen auch unterschiedlich aus. Sind zwei Varianten aber isomorph, so ist auch das Ergebnis der Ausführung identisch.
Überblick über Apache Airflow
Programmiersprache:
Apache Airflow ist in Python geschrieben und DAG werden ebenfalls in Python geschrieben. Aktuell können die Graphen selber (also der DAG) nur in Python implementiert werden. Es ist aber im Rahmen von Python durchaus möglich, die auszuführende Logik in anderen Sprachen zu implementieren. Dafür können verschiedene Ansätze verfolgt werden. Über einen Subprozess kann ein Script lokal ausgeführt werden. Oder über das FFI (Foreign Function Interface) kann die Logik in C oder C++ implementiert werden. Dafür stellt Apache Airflow verschiedene Module zur Verfügung: ctypes, cffi, oder Cython. Selbstverständlich können auch externe Services aufgerufen werden, welche die eigentliche Logik implementieren. Wenn es sich nicht um CPU intensive Rechenprozesse handelt, lohnt sich der Aufwand in der Regel nicht, eine C/C++ Implementierung zu verwenden. Das Ausführen von Bash-Scrips/Befehlen kann aber für einfache aufgaben durchaus seinen Charme haben.
Bestandteile und Begriffe
Schauen wir uns die wichtigsten Begriffe und Konzepte von Apache Airflow an:
DAG
Ein DAG in Airflow ist technisch gesehen eine Instanz der Klasse DAG aus dem Modul airflow. Inhaltlich ist es eine Sammlung von Tasks (siehe nachfolgend) mit Metadaten zur Ausführung. Ein DAG hat eine Id und in der Regel eine Ausführungsplan, optional Standardargumente. Ein DAG kann auf unterschiedliche Arten erstellt werden. Siehe dazu auch "TaskFlow" weiter unten.
Operator
Ein Operator ist konzeptionell eine Vorlage für eine vordefinierte Aufgabe, die Sie einfach deklarativ in Ihrem DAG definieren können. Technisch gesehen ist es eine Klasse. Beispiele sind das Starten einer Python-Funktion, das aufrufen eines Bash-Shell-Befehls oder einer Datenbankabfrage. Ein Operator definiert die Logik für die jeweilige Aktion. Es gibt diverse Operatoren zur Auswahl und in der Regel reichen diese aus. Insb. da mit dem PythonOperator ein Operator zur Verfügung steht, welcher eine Python-Funktion ausführt. In dieser Funktion kann eine beliebige Logik implementiert werden. Hier finden Sie alle Operatoren die im Standard vorhanden sind. Dazu gibt es viele Operatoren von Drittanbietern, die installiert werden können. Eine Übersicht finden Sie hier.
Es ist auch möglich, einen eigenen Operator zu implementieren. Dazu muss von der Klasse BaseOperator geerbt werden und die Methode execute(self, context) muss entsprechend implementiert werden. Der Overhead vom Klassenrumpf zum PythonOperator mit einer einfachen Funktion lohnt sich in der Regel aber nur, wenn die Implementierung sehr spezifisch ist und das Konstrukt einer Klasse erfordert und/oder wenn diese Funktion häufig wiederverwendet wird.
Task
Ein Task ist konzeptionell eine konkrete Instanz eines Operators, ergänzt um Metadaten wie Start- und Endzeit sowie dem Status. Technisch gesehen ist es eine Kombination der instanzierten Klasse des Operators und den Metadaten zu diesem Task, die Apache Airflow in der Metadaten-Datenbank führt. Im Code liegt Wahrheit. Schauen wir uns das Verhältnis von Operator und Task an einem konkreten Code-Beispiel an:
Wir verwenden hier einen PythonOperator, welcher die Funktion print("Hello World!") ausführt. Der Operator bekommt zusätzlich die Task-Id: "print_hello_wordl". Die Variable "task" ist die instanzierte Klasse PythonOperator mit der gegebenen Id und der definierten Funktion. Zusätzlich führt Airflow für den Task mit der Id: "print_hello_world" u.a. die Metadaten zum Status und der effektiven Start- und Endzeit.
Sensor
Ein Sensor ist eine spezielle Art von Operator, der genau eine Aufgabe hat: Er wartet darauf, dass etwas passiert oder vorhanden ist. Das kann rein zeitbasiert sein oder eine Prüfung ob eine bestimmte Datei existiert oder sonst ein externes Ereignis. Sobald dies eintrifft, beendet er seine Aufgabe, damit die nachgelagerten Aufgaben ausgeführt werden können. In der Regel kann man festlegen, wie lange man maximal warten möchte, bevor der DAG ohne Erfolg abgebrochen wird.
Scheduler
Der Scheduler startet und überwacht die zeitliche Ausführung von DAG. Se können definieren, wann er starten soll und wie lange er ausgeführt werden soll. Dazu haben Sie verschiedene Möglichkeiten, ein Intervall zu definieren. Dazu kann auch die bekannte Cron-Syntax verwendet werden. Der Scheduler ist auf Stufe DAG.
Trigger
Ein Trigger legt die Bedingungen fest, bei welchen ein Task ausgeführt wird. Der Trigger ist entsprechend auf Stufe Task. Standard ist, dass ein Task unmittelbar ausgeführt ist, wenn er gemäss Graph an der Reihe ist und alle vorherigen Aufgaben erfolgreich waren. Dieses Verhalten kann aber mit Trigger-Rules pro Task abweichend konfiguriert werden. Es gibt dazu verschiedene mögliche Rules wie: all_failed, one_failed, one_success etc. Damit lassen sich auch Fehlerfälle spezifizieren. Mehr dazu später.
Executor
Der Executor ist eine Hauptkomponente von Apache Airflow. Er führt die in Python definierten Task effektiv aus. Für diese Ausführung gibt es verschiedene Strategien und Möglichkeiten. Entsprechend bietet Apache Airflow mehrere Executor an. Die Wahl des Executor wird in der Konfigurationdatei airflow.cfg definiert. Grob wird zwischen lokalen und remote Executor unterschieden. Hier eine Auswahl an möglichen Executoren:
- Lokal:
- LocalExecutor
- SequentialExecutor - Remote:
- CeleryExecutor
- BatchExecutor
Die Wahl des geeigenten Executor hängt vor allem von der Arbeitslast ab und der Frage, ob und wie parallel Tasks ausgeführt werden sollen. Informationen zu Executor finden Sie hier.
Task-Context
Der Task-Context ist ein Dictionary mit Meta-Informationen zum Task, bspw. der Startzeitpunkt. Via Task-Context kann man bspw. auch auf XCom-Werte zugreifen, mehr dazu unter XCom. Der Task-Context wird als Dictionary einem Operator automatisch als Argument übergeben.
Jinja Template
Apache Airflow unterstützt direkt die Jinja Template Engine. Dies kann sehr nützlich sein, um bspw. dynamische Dateinamen zu definieren, E-Mail-Templates zu erstellen etc. Programm-Logik wird dabei in Geschweiften-Prozentklammern geschrieben: {% Logik %} und die Referenzierung auf Variablen mit doppelten Geschweiften Klammern: {{ Wert }}. Es lassen sich damit sogar dynamische Bash-Kommandos erstellen. Die Möglichen Einsatzszenarien sind vielfältig.
Task-Dependency
Wenn man mehrere Tasks ohne Abhängigkeiten in einem DAG definiert, werden alle parallel ausgeführt. Dies ist in der Regel nicht gewollt. Vielmehr möchte man einen bestimmten Flow von Abhängigkeiten definieren. Apache Airflow bietet dazu verschiedene Möglichkeiten.
Direkte 1:1 Abhängigkeit:
Nach Task A folgt Task B. Dies kann mit dem überschriebenen Bit-Shift-Operator gemacht werden: task_a >> task_b. Alternativ kann auch die Methode set_downstream verwendet werden: task_a.set_downstream(task_b). Oder umgekehrt set_upstream. Üblicherweise wird aber die Bit-Shift-Notation verwendet.
Direkte 1:n oder n:1 Abhängigkeit:
Wenn man von einem Task auf mehrere parallele Task gehen will und diese anschliessend wieder auf einen zusammenführen will, kann man dies mit einem Array machen: task_a >> [task_b, task_c] >> task_d. In diesem Fall wird zuerst task_a ausgeführt. Danach werden task_b und task_c parallel ausgeführt und schliesslich, je nach Definitionen, wenn beide abgeschlossen sind, wird task_d ausgeführt. set_up|downstream kann ebenfalls ein Array übergeben werden.
Verzweigung:
Wenn man von Task A einen Zweig mit Task B und C und einen eigenen Zweig mit Task D und E machen will, kann man dies einfach mit mehreren direkten Abhängigkeiten machen:
task_a >> task_b >> task_c
task_a >> task_d >> task_e
In diesem Fall werden task_b und task_d gleichzeitig ausgeführt. Um diese wieder auf einen Task F zusammen zu führen, müssen alle Enden (Tasks) auf diesen Task F zeigen:
[task_b, task_c, task_d, task_e] >> task_f
Komplexe Abhängigkeit/Verzweigung:
Wir haben gesehen, dass man mit der Kondition steuern kann, wann ein Task ausgeführt werden soll. Wenn nun die Verzeigung komplexer ist, reicht dies nicht aus. Wenn Sie bspw. nach Task A drei Tasks zur Auswahl haben, von denen einer, basierend auf Task A oder bspw. auf externen Daten, für den nächsten Schritt ausgewählt werden soll, dann können Sie das mit einem speziellen Task mit dem BranchPythonOperator umsetzen. Der BranchPythonOperator gibt eine Task-Id (Str) zurück. Damit können Sie mit den Mitteln von Python frei eine Logik implementieren, wie der nächste Task ermittelt werden soll. Damit können Sie dann eine direkte Abhängigkeit auf diesen Task definieren und dieser Task definiert dann den nächsten Task:
task_a >> task_branch_python_operator
Task-Group
Wir haben gesehen, wie Abhängigkeiten zwischen Task definiert werden können. Für eine Hand voll statischer Tasks ist das ideal. Haben wir dutzende ohne mehr von Tasks, die parallel ausgeführt und wieder zusammengeführt werden sollen, wird es mühsamer. Wollen wir Tasks sogar dynamisch anhand von externen Informationen erstellen, ist es so nicht mehr möglich. Hier kommen die Task-Gruppen zum Einsatz. Sie können Tasks einfach in Gruppen gruppieren und die Abhängigkeiten auf die ganze Gruppe anwenden. Am einfachsten erstellen Sie eine Task-Gruppe mit dem "with"-Statement. Bspw.:
with TaskGroup("task_group") as task_group:
task_a = DummyOperator(task_id='do_seomething_1')
task_b = DummyOperator(task_id='do_seomething_2')
Dann können Sie einfach eine Abhängigkeit auf die Gruppe definieren:
task_c >> task_group
Die Tasks a und b werden so parallel ausgeführt.
Wobei es auch möglich, innerhalb der Gruppe eine Reihenfolge zu definieren, so dass die Tasks innerhalb einer Gruppe nicht parallel ausgeführt werden. Dies ist einfach mit dem Downstream-Operator (>>) möglich:
with TaskGroup("task_group") as task_group:
task_a = DummyOperator(task_id='do_seomething_1')
task_b = DummyOperator(task_id='do_seomething_2')
task_b >> task_a
task_c >> task_group
Damit wird wieder zuerst Task c ausgeführt und danach die Task-Gruppe. Innerhalb dieser wird aber zuerst Task b ausgeführt und erst nachher Task a.
Hook
Ein Hook in Apache Airflow ist eine standardisierte, wieder verwendbare Verbindung zu einem externen System. Es gibt viele Hooks von Drittanbieter, bspw. für Datenbanken und Speicherzugriffe. Die Klasse BaseHook bietet die Schnittstelle der Verbindung und der Ausführung. Hooks werden in der Regel in den Funktionen für Operatoren verwendet. Eine Übersicht von verfügbaren externen Hooks finden Sie hier.
PlugIn
Ein PlugIn ist vereinfacht gesagt ein Python-Modul, welches an einem definierten Ort gespeichert ($AIRFLOW_HOME/plugins) wird und einen Operator, Sensor oder einen Hook etc. implementiert. Airflow lädt die PlugIns automatisch, entsprechend vereinfacht ist deren Nutzung in DAG. Weitere Informationen zu PlugIns finden Sie hier.
Provider
Ein Provider in Airflow ist ein spezifisches Plugin für die Verbindung mit einem Drittsystem, bspw. der Google Cloud. Ein "Provider-PlugIn" stellt i.d.R. verschiedene Operatoren, Sensoren und Hooks zur Verfügung, um mit dem Drittsystem zu interagieren.
Anmerkung: Viele Provider sind relativ gross mit sehr vielen Abhängigkeiten. Oftmals ist es effizienter, wenn man gezielt direkt eine notwendige Library verwendet als einen ganzen Provider, wenn man davon nur eine Funktion nutzt.
XCom
Tasks in Airflow teilen grundsätzlich keine Daten. XCom (Cross-Communication) bietet die Möglichkeit, dass Tasks Informationen untereinander teilen können. Sobald ein Operator einen Rückgabewert liefert, bspw. return "Hello World!", so wird dieser Wert als Key-Value-Pair mit der Task-Id als Schlüssel in XCom gespeichert. Ein Task kann dann via Key-Value-Argumente auf Werte in XCom zugreifen. Dies wiederum via Task-Id:
task_instance = kwargs['ti']
pulled_message = task_instance.xcom_pull(task_ids='push_task_id')
Werte in XCom werden in der Airflow-Metadatenbank gespeichert und sind nur während der Laufzeit des DAG verfügbar. XCom ist für kleine Daten gedacht. Die Speicherlimitierung hängt von der verwendeten Airflow-Metadatenbank ab. Wichtig zu beachten ist, dass grosse Daten in XCom auch einen negativen Einfluss auf die Performance haben. Wenn grosse Daten unter Tasks ausgetauscht werden sollen, ist der Einsatz einer InMemory-Datenbank bspw. via Redis zu prüfen.
TaskFlow
TaskFlow ist eine neue API in Airflow 2.0, die das Erstellen von DAG vereinfacht. Im Gegensatz zur traditionellen Methode, bei der DAGs durch Instanziierung von Operatoren und dem Setzen von Abhängigkeiten aufgebaut werden, ermöglicht TaskFlow die Definition von DAG auf eine mehr deklarative Art und Weise. die TaskFlow-API baut dabei hauptsächlich auf drei Dekoratoren auf. Die Funktionen kennen Sie bereits, es ist nur eine andere Schreibweise:
@dag
Mittels DAG-Dekorator kann auf einfache Art und Weise ein DAG definiert werden:
@dag(
dag_id='my_dag_id',
start_date=datetime(2024, 10, 1),
schedule_interval='@daily'
)
def my_dag():
# Hier definieren Sie die Tasks
pass
@task
Dasselbe für einen Task:
@task
def extract_data():
# Extrahieren Sie die Daten hier
return data
Verwirrend bei den Tasks ist manchmal die Verwendung von Operatoren. Mit dem Task-Dekorator brauchen Sie den PythonOperator eigentlich nicht, da Sie den auszuführenden Code direkt in der mit @task dekorierten Funktion definieren. Wenn Sie einen anderen Airflow-Deokrator brauchen, können Sie diesen klassisch instanzieren. TaskFlow und "klassich" können entsprechend gemischt verwendet werden:
@dag(
dag_id='my_dag_id',
start_date=datetime(2024, 10, 1),
schedule_interval='@daily'
)
def my_postgres_dag():
@task
def do_something():
# Hier die Logik implementieren
pass
exec_sql = PostgresOperator(
task_id='exec_sql',
postgres_conn_id='my_postgres_conn',
sql='SELECT ...'
)
do_something() >> exec_sql
@task_group
Der Task-Group-Dekorator vereinfacht die Definition von Gruppen:
@task_group(group_id='group_id')
def process_data():
@task
def task_1():
# Logik hier
pass
@task
def task_2():
# Logik hier
pass
Es ist mit dem Task-Group-Dekorator auch einfach möglich, Tasks dynamisch, bspw. mit einer Schleife zu erzeugen:
@task_group(group_id='group_id')
def process_data_group():
for i in range(5):
@task(task_id=f'process_task_{i}')
def process_task(i):
# Logik hier
return i
process_task(i)
Damit lassen sich bspw. 5 Tasks auf elegante Weise erzeugen.
Hinweis zur Performance: Dekoratoren in Python erzeugen Wrapper-Funktionen, die sich auf die Ausführungsgeschwindigkeit auswirken. Es gilt bei Dekoratoren jeweils abzuwägen zwischen syntaktischem Zucker und Ausführungsgeschwindigkeit. Airflow eignet sich nicht für kurze hoch performante Workflows. Aus diesem Grund spielt die Verwendung von Dekoraten in der Regel keine grosse Rolle.
Backfill
Backfill in Apache Airflow ist eine Funktion, die es ermöglicht, vergangene DAG-Runs nachträglich auszuführen, um sicherzustellen, dass alle Tasks für einen bestimmten Zeitraum ausgeführt wurden, auch wenn der DAG in diesem Zeitraum noch nicht aktiv oder verfügbar war. Backfill kann man konfigurieren oder via CLI forciert ausführen, auch wenn es nicht definiert ist. Konfigurativ kann man die DAG-Option catchup auf True setzen. Via CLI kann die Backfill-Funktion wie folgt aufgerufen werden:
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
Mehr Informationen zu Backfill finden Sie hier.
DAG starten
Bislang haben wir mit dem Scheduler definiert, wann ein DAG ausgeführt werden soll. Dies ist für regelmässig auszuführende Workflows ideal. Es gibt jedoch auch Use Cases, bei denen die Ausführung unregelmässig ist und manuell respektive von einem Drittsystem angestossen wird. Neben dem Scheduler gibt es drei weitere Möglichkeiten, einen DAG zu starten:
Airflow UI:
Im Web-UI auf der DAG-Übersicht (/home) finden Sie pro DAG auf der rechten Seite das "Play"-Symbol mit der Bezeichnung "Trigger DAG". Damit können Sie einen DAG manuell starten.
CLI:
Über das Command-Line-Interface von Airflow können Sie mit folgendem Befehl einen DAG manuell starten:
airflow dags trigger <dag_id>
Die CLI-Referenz von Airflow finden Sie hier.
API:
Airflow bietet eine Rest-API-Schnittstelle an. Mit dieser können Sie einen DAG folgendermassen manuell starten:
URL-Syntax: /api/v1/dags/{dag_id}/dagRuns
Beispiel:
curl -X POST "http://<your-airflow-host>/api/v1/dags/your-dag-id/dagRuns" \
-H "Content-Type: application/json" \
-H "Authorization: Basic <base64_encoded_credentials>" \
-d '{"conf": {}, "execution_date": "2024-10-01T12:00:00"}'
Die Konfiguration ist dabei optional, die Autorisierung kann konfiguriert werden.
Die Rest-API-Spezifikation finden Sie hier.
Fehlerbehandlung
Die Fehlerbehandlung in einem Workflow ist von zentraler Bedeutung. Airflow bietet verschiedene Werkzeuge um eine adäquate Fehlerbehandlung zu implementieren.
Task-Retry:
Sie können generell für einen DAG definieren, dass ein Task bei einem Fehler eine definierte Anzahl Male wiederholt ausgeführt wird. Dazu kann eine Verzögerung festgelegt werden:
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
Diese DAG-Konfiguration kann auf Stufe Task gesetzt oder überschrieben werden:
my_task = BashOperator(
task_id='my_task',
bash_command='echo "Hello World"',
retries=2,
retry_delay=timedelta(minutes=3),
dag=dag
)
Zusätzlich kann der Parameter retry_exponential_backoff=True gesetzt werden. Dann wird der Wert von retry_delay exponentiell verwendet. Die Berechnung erfolgt dabei wie folgt:
delay = retry_delay * 2^(n-1)
Wobei n die Anzahl Versuche ist. Bei einer retry_delay von 2 Minuten beträgt die effektive Verzögerung beim dritten Versuch demnach: 2 * 2^(3-1) = 8 Minuten. Zusätzlich wird noch ein Jitter hinzugefügt, so dass die effektive Verzögerung etwas abweicht. Das Ziel ist es, dass das System nicht durch zu viele gleichzeitige wiederholende Tasks zu stark belastet wird.
Timeouts:
Sie können, wieder auf Stufe DAG (für alle Tasks) oder auf Stufe Task einen Parameter setzen, welcher die maximale Ausführungszeit definiert:
execution_timeout=timedelta(minutes=10)
Dauert ein Task länger, wird es abgebrochen. Damit können Sie ungewollte long running tasks vermeiden.
Alerting:
Sie können einstellen, ob Sie bei Fehlern und/oder bei Wiederholungsversuchen (retries) per E-Mail benachrichtig werden möchten:
default_args = {
'email': ['incident@example.com'],
'email_on_failure': True,
'email_on_retry': False,
}
Das obige Beispiel sendet nur bei Fehlern eine E-Mail, nicht aber bei retries.
Callbacks:
Sie können sowohl auf Task-Ebene wie auch auf DAG-Ebene Callback-Funktionen definieren, wie im Falle eines Fehlers aufgerufen werden. Damit können Sie eine individuelle Behandlung dieser implementieren:
def task_failure_callback(context):
task_instance = context['ti']
dag_id = context['dag'].dag_id
print(f"Task {task_instance.task_id} in DAG {dag_id} failed!")
task = PythonOperator(
task_id='your_failing_task',
python_callable=your_task_function,
on_failure_callback=task_failure_callback
)
Hinweis: Die Funktion your_task_function ist im Beispiel oben nicht definiert.
Auf Stufe DAG könnte es so aussehen:
def dag_failure_callback(context):
dag_run = context['dag_run']
print(f"DAG {dag_run.dag_id} failed!")
with DAG(
'your_dag',
default_args=default_args,
on_failure_callback=dag_failure_callback
) as dag:
...
Callbacks sind ein mächtiges Werkzeug, um passende Fehlerbehandlungen zu implementieren.
Trigger-Rules:
Sie lernten bereits die Trigger-Rules kennen. Insbesondere die beiden Typen "all_failed" und "one_failed" können natürlich hervorragend für die Fehlerbehandlung genutzt werden, in dem alternative Pfade im Falle eines Fehlers eingeschlagen werden können.
XCom und BranchPythonOperator:
Die beiden Werkzeuge XCom und BranchPythonOperator kennen Sie bereits. Diese kombiniert können Sie natürlich ebenfalls zur individuellen Fehlerbehandlung nutzen. Sie fangen in Ihrem Python-Code einen Fehler ab und speichern dies in XCom. Dann nutzen Sie einen BranchPythonOperator, welcher basierend auf den Werten in XCom entscheidet, wie es weiter geht. Im Falle eines Fehlers, können Sie so ebenfalls eine alternative Route einschlagen.
Testing
Es gibt hier nichts zu beschönigen. Workflows zu testen ist harte Arbeit. Und das liegt nicht an Apache Airflow, vielmehr in der Natur der Sache. Workflows agieren in der Regel mit externen Systemen, Datenbanken, Filesystem, API etc. Das macht das Testing vielschichtig und aufwändig. Es gilt drei Bereiche getrennt zu beachten:
- Ihre eigene Business-Logik, die Sie implementieren, ggf. unter Verwendung von Dritt-Bibliotheken.
- Der DAG mit seinen Tasks und Abhängigkeiten
- Die Systemumgebung mit Drittsystemen wie Datenbanken etc.
Ihre eigene Business-Logik testen Sie am besten wie gewohnt, Sie werden dafür wahrscheinlich entsprechende Unit-Test-Verfahren im Einsatz haben.
Der DAG mit seinen Tasks und Abhängigkeiten können Sie mit einem Testframework wie pytest Link testen. Beachten Sie dazu folgendes: Operatoren die keinen Task-Context brauchen, können Sie grundsätzlich direkt einzeln mit execute() ausführen und so testen. Operatoren die einen Task-Context brauchen, können Sie nur zusammen mit einen DAG sinnvoll testen.
Tipp: DAG steht bekanntlich für Directed Acyclic Graph. Sie können aber problemlos einen zyklischen Graphen definieren: task_a >> task_b >> task_a. Airflow reklamiert erst bei der Ausführung. Sie können ein DAG einfach mit dag.test_cycle() auf diesbezügliche Konformität im Rahmen eines Tests prüfen.
Die Systemumgebung ist etwas aufwändiger zum Testen. Hier kann Whirl nützlich sein. Whirl erstellt auf Basis von Docker und Docker Composer konfigurierbare Airflow-Umgebungen, die produktionsnahe sein sollten. Whirl finden Sie hier auf GitHub: Link
Hier finden Sie ein YouTube-Video aus dem Jahr 2019, wo Whirl ab ca. Min. 7:50 vorgestellt wird: Link
Installation:
Apache Airflow ist grundsätzlich schnell und einfach installiert. Die Frage ist mehr, welches Betriebsmodell geeignet ist. Es gibt für die Installation verschiedene offizielle Möglichkeiten:
- Direkt von der Release-Ressource
- Via PyPI
- Via Docker-Image
- Via Helm Chart
Daneben gibt es natürlich verschiedene "Airflow as a Service"-Angebote, u.a.:
Und last but not least: Cloud Composer von Google. Diese Option schauen wir uns gleich etwas genauer an.
Als Entwickler empfehle ich je nach Präferenz für die lokale Nutzung die Installation via PyPI oder via Docker-Image.
Um Airflow lokal via pip/PyPI zu installieren, können Sie den folgenden Befehl verwenden:
pip install "apache-airflow[celery]==2.10.2" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.8.txt"
Mehr Informationen dazu finden Sie hier.
Drittbibliotheken:
Wenn Sie Drittbibliotheken, bspw. in einem Operator, verwenden, ist es wichtig, dass diese in der richtigen Umgebung installiert ist. Airflow läuft in der Regel in einer virtuellen Umgebung. Wenn Sie Airflow via Docker-Image betreiben ist es wichtig, dass die Drittbibliothek auch in diesem Container installiert wird. Wenn Sie Airflow via Managed Service, bspw. Cloud Composer, verwenden, können Sie eine aktuelle requirements.txt-Datei verwenden, damit der Service die Drittbibliothek entsprechend installiert. Informationen zur requirements.txt-Datei finden Sie hier. Selbstverständlich muss die Drittbibliothek die Python-Version unterstützen unter welcher Airflow läuft. Ansonsten können Sie die Bibliothek nach der Installation wie gewohnt verwenden.
Community:
Die Entwickler- und Nutzergemeinschaft von Apache Airflow wächst rasant. dies zeigt die folgende Grafik, mit Vergleich zu Apache Spark oder Apache kafka:
Die belegt die Tatsache, dass Airflow ein etabliertes Produkt mit hervorragenden Zukunftsaussichten ist.
Damit sind wir am Ende der kurzen Einführung über Apache Airflow. Ich hoffe, es gab Ihnen einen guten ersten Überblick über das Werkzeug und einen guten Einstieg für das Eintauchen mit anderen Quellen.
Ab hier widmen wir uns der Diskussion über die Architektur und einem ganz konkreten Einsatzszenario. Dabei sehen wir uns die Möglichkeit an, Apache Airflow mit Apache Beam zu kombinieren.
Apache Airflow-Applikation mit Cloud Composer betreiben
Wenn Sie Airflow auf der GCP nutzen wollen, ist die bevorzugte Variante aus meiner Sicht Cloud Composer. Es sei aber an dieser Stelle auch erwähnt, dass Sie natürlich die Möglichkeit haben, Apache Airflow auf der GCP selber mittels VM oder GKE zu betreiben. Dazu stehen Ihnen die bereits bekannten offiziellen Installationsmöglichkeiten (Docker-Container, Helm Chart etc.) zur Verfügung.
1, 2 oder 3?
Cloud Composer gibt es (auch) in den Versionen 1-3. Die Cloud Composer-Versionen haben allerdings nichts mit der jeweiligen Apache Airflow-Version zu tun, resp. diese müssen nicht identisch sein. In Cloud Composer 3 läuft bspw. zurzeit Apache Airflow 2. Die Cloud Composer-Version 1 ist deprecated und die Version 3 ist in Pre-GA, also noch nicht offiziell verfügbar (in Preview):
Für neue Projekte mit Cloud Composer empfehle ich die Version 3 zu nutzen. Ich gehe davon aus, dass diese in Kürze offiziell verfügbar sein wird (GA) und somit produktiv genutzt werden kann. Die Unterschiede zwischen den Cloud Composer-Versionen finden Sie im Detail hier.
Um Cloud Composer zu aktivieren, haben Sie die gewohnten Möglichkeiten der GCP:
Konsole
Mit wenigen Klicks können Sie Cloud Composer im Web-UI aktivieren:
https://cloud.google.com/composer/docs/composer-3/create-environments#console
https://cloud.google.com/composer/docs/composer-3/create-environments#console
CLI
Wenn Sie die CLI (gcloud) aktiviert haben, ist die Aktivierung via Command Line gewohnt einfach:
API
Wird meiner Erfahrung nach eher selten direkt genutzt, aber via Rest-API ist die Aktivierung ähnlich zur CLI:
Terraform
Last but not least, kann die Aktivierung natürlich auch über die für produktive Systeme bevorzugte Variante mittels Terraformerfolgen:
Speicherort der Python-Programme
In Cloud Composer werden die DAG (Python-Programme) in einem spezifischen Cloud Storage Bucket gespeichert und von da automatisch in Apache Airflow synchronisiert. Sie finden den Speicherort dieses Bucket via Konsole (Web-UI) auf der Detailseite der DAG oder können diesen via CLI abfragen:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Speichern Sie Ihre DAG entsprechend in diesem Bucket um Sie von Apache Airflow ausführen zu können.
Cloud Composer aus Sicht IT-Architektur
Was haben Sie für eine (Service-) Architektur, wenn Sie einen zentralen Service wie Cloud Composer einsetzen? Wie ein Datenbank-Service werden Sie aus Kostengründen nicht für jeden Microservice, welcher einen DAG nutzt, eine eigene Composer-Instanz bereitstellen. Eher sinnvoll ist es, dass Sie pro Umgebung: Entwicklung, Testing, Integration, Produktion etc. eine eigene Instanz haben. Damit ist Cloud Composer eine zentrale Service-Komponente im Sinne einer Service-Orientierten Architektur (SOA). Je nach Kapselung der DAG und deren Eigenständigkeit, können Sie natürlich zur Argumentation gelangen, dass jeder DAG für sich ein Microservice darstellt und Cloud Composer/Apache Airflow die geteilte Runtime ist, wie dies auch bei anderen Microservice-Architekturen der Fall ist, wo eine gemeinsame Node.js-Instanz oder ein GKE für die Ausführung verwendet wird. Diese Sichtweise ist meine Empfehlung beim Design der DAG. Konzipieren Sie diese so, dass sie als eigener Microservice klassiert werden können. Damit sollten Sie, so lange Schnittstellen eingehalten werden, unabhängig von anderen Services gewartet und erweitert werden können. Definieren Sie Cloud Composer entsprechend als Runtime in Ihrer Architektur.
Workflow vs. Streaming-Like (Batch-) Prozesse
Nicht selten besteht ein Prozess aus verschiedenen Schritten, wovon nicht jeder gleichermassen gut geeignete ist für das eigentlichen Workflow-System. Es stellt sich aus architektonische Sicht die Frage, eignet sich für das Vorhaben Apache Airflow oder kann die Anforderungen nicht besser mit einem performanten Streaming-System abgedeckt werden?
An dieser Stelle der Überlegungen kommt Dataflow ins Spiel. Schaut man sich als Architekt die Landkarte von geeigneten Werkzeugen auf der Google Cloud an, schwankt man hin und wieder zwischen Apache Airflow und Apache Beam. Oder in der Welt von Google gesprochen, zwischen Cloud Composer und Dataflow. Dataflow kann hervorragend (auch) mit Batch-Prozessen umgehen und bietet daher bei einfachen (vor allem sequenziellen) Workflows eine nennenswerte Überschneidung mit Cloud Composer. Wann nutze ich nun Cloud Composer und wann Dataflow?
Das beste aus beiden Welten. Oder das eine tun und das andere nicht lassen.
Hier bietet sich manchmal eine klassische Win-Win-Situation mit einer sowohl-als-auch Entscheidung an.
Dataflow lässt sich hervorragend in Cloud Composer integrieren. Es gibt dafür offizielle Operatoren um Dataflow als Cloud Composer Task auszuführen.
Eine Prüfung dieser Option ist immer dann angezeigt, wenn der gesamte Workflow aus mehreren Schritten besteht, insbesondere wenn diese nicht rein sequenziell sind, und mindestens einer davon eher einen Streaming-Charakter oder zumindest einen grossen Workload mit Fokus auf Performance aufweist.
Bei der Integration von Dataflow in Cloud Composer hat man zwei Hauptvarianten:
- Dataflow im Cloud Composer-Prozess ausführen
- Dataflow extern als "echten" Dataflow-Prozess ausführen
(Der Vollständigkeitshalber sei hier noch die Option Dataflow SQL, als weitere mögliche Option erwähnt.)
Die Wahl der Variante hängt hauptsächlich davon ab, wie stark der Dataflow-Prozess skalieren muss oder wie gross er zu Beginn schon ist. Kurzlaufende kleine Prozesse lassen sich in der Regel problemlos direkt im Cloud-Composer-Prozess ausführen. Grössere, oder wenn man später rasch skalieren muss, werden bevorzugt als eigenstände Dataflow-Anwendungen ausgeführt.
Sogenannte non-templated Dataflow Pipelines werden im Airflow-Prozess ausgeführt. Templated Dataflow Pipilines entsprechend in einem normalen Dataflow-Prozess.
Non-templated Pipelines können Sie auf verschiedene Arten definieren: JSON, Python oder Java.
Für die Ausführung können Sie definieren, ob auf das Ende der Dataflow-Pipeline gewartet werden soll oder nicht. Dafür gibt es mit Batch, Streaming oder Blocking wiederum mehrere Möglichkeiten.
Templated Dataflow Pipelines ermöglichen, dass man den Apache Beam-Code auf Cloud Storage speichert und die Pipeline dann von dort via Dataflow ausgeführt wird. Eine Operator-Definition kann bspw. so aussehen:
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
wait_until_finished=True,
)
Auch templated Job können Sie mit deferrable und wait_until_finished entsprechend konfigurieren.
Weitere Informationen, insbesondere auch dazu, wie Sie eine Dataflow Pipeline in Apache Airflow stoppen oder löschen können, finden Sie hier: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataflow.html
Ein konkretes Anwendungsbeispiel: EU Sanktions-Daten mit ESC indexieren
Genug der Theorie. Wir haben nun das Rüstzeug, für einen realen Use Case.
Finanzintermediäre sind in den meisten Ländern verpflichtet, Kundenbeziehungen und Transaktionen gegen Sanktionslisten zu prüfen. Es würde den Rahmen sprengen, hier auf fachliche und rechtliche Details einzugehen. Man könnte wortwörtlich ein Buch darüber schreiben: https://ronnyfuchs.blogspot.com/2010/08/name-matching-systeme-fur-compliance.html Das Buch ist leider vergriffen. Schreib mir in den Kommentar, wenn du mehr zum Thema wissen möchtest. Für unseren realen Use Case muss es reichen, dass wir Sanktionsdaten von einer externen Quelle herunterladen können.
Unser Use Case sieht wie folgt aus: Wir laden eine Sanktionsliste herunter, importieren diese in BigQuery, lesen jeweils ein Paket von 100 Einträgen und senden diese an den Index-Service, welcher diese indexiert. Sobald die Namen indexiert sind, können wir unscharf prüfen, ob Kunden oder Zahlungsempfänger von uns auf dieser Sanktionsliste stehen.
Wir nehmen für unser Beispiel die Sanktionsliste der Schweiz ("SECO-Liste"), welche gut formatiert im CSV-Format von "Open Sanctions" heruntergeladen werden kann: https://data.opensanctions.org/datasets/20241022/ch_seco_sanctions/targets.simple.csv?v=20241022120902-oux
Bitte beachten Sie die Lizenzen dieser Daten. Diese dürfen nur frei für nicht kommerzielle Zwecke verwendet werden.
Als Index- und Name-Matching-Lösung verwenden wir die Open-Source-Bibliothek ESC: https://esc.asderix.com/
Ich habe in einem Artikel bereits im Detail beschrieben, wie man mit dieser Bibliothek auf der GCP einen serverless Service zum Indexieren und Suchen aufsetzen kann. Den Artikel finden Sie hier: https://ronnyfuchs.blogspot.com/2024/05/cloud-serie-gcp-filestore-vs-cloud.html
Dataflow habe ich in diesem Artikel hier im Detail beschrieben: https://ronnyfuchs.blogspot.com/2024/05/cloud-serie-streaming-mit-apache-beam.html
Damit haben wir eigentlich bereits alles zusammen, was wir brauchen. In Anlehnung an Arc42 sieht unsere Lösung grob wie folgt aus:
Den vollständigen Source-Code gibt es auf GitHub: https://github.com/asderix/ComposerBasedEscIndexer
Den vollständigen Source-Code zu diesem DAG finden Sie ebenfalls auf GitHub: https://github.com/asderix/ComposerBasedEscIndexer
Kontextabgrenzung:
Wir fokussieren uns auf den Index-Service. Dieser lädt Sanktionsdaten herunter und indexiert diese. Das Abgleichen von Kunden- oder Transaktionsdaten gegen diesen Index ist die logische Nutzung, aber nicht im Scope der vorliegenden Umsetzung.
Laufzeitsicht:
![]() |
Abb. 7: Vereinfachte Laufzeitsicht der Anwendung |
Bein Cleanup-Operator gibt es verschiedene Möglichkeiten. Hier muss man entscheiden, ob man bspw. die Tabellen behalten will und mit Versionen arbeitet. Ob es pro Version eine neue Tabelle geben soll oder ob historisiert mittels Partionierung von BigQuery in dieselbe Tabelle gespeichert werden soll. Ebenso bei den Files. Im Beispiel lassen wir die Implementierung dieses Schrittes weg.
Verteilungssicht:
Die Beispiel-Applikation des ESC Microservice aus dem oben erwähnten Artikel müssen wir etwas modifizieren resp. erweitern. Wir wollen den Service nicht einzeln pro Person aufrufen. Wir senden dem Service Pakete von bis zu 100 Personen, die wir indexieren möchten. Die Code-Ergänzung besteht aus einem neuen Endpunkt, welcher eine Liste (ein JSON-Array) von Personen entgegen nimmt und indexiert und sieht wie folgt aus:
![]() |
Abb. 9: Code-Ausschnitt erweiterter ESC-Service |
Den vollständigen Source-Code gibt es auf GitHub: https://github.com/asderix/ComposerBasedEscIndexer
Apache Airflow
Der Airflow DAG lässt sich relativ kompakt implementieren:
![]() |
Abb. 10: Airflow DAG Code-Ausschnitt |
Den vollständigen Source-Code zu diesem DAG finden Sie ebenfalls auf GitHub: https://github.com/asderix/ComposerBasedEscIndexer
Dieser DAG sieht in Apache Airflow wie folgt aus:
Apache Beam
die Beam-Pipeline zeigt aus meiner sich exemplarisch, wie elegant man solche Anforderungen mit Apache Beam umsetzen kann:
![]() |
Abb. 12: Code-Ausschnitt Apache Beam Pipeline |
Auch hier finden Sie den vollständigen Source-Code auf GitHub: https://github.com/asderix/ComposerBasedEscIndexer
Deployment
Es gibt verschiedene Optionen, wie Sie die Anwendung deployen können. Es ist auch möglich, dass Sie zu lern- und testzwecken die Applikation lokal ausführen. Sie können Apache Airflow inkl. den Google- und Beam-Modulen lokal installieren und entsprechend lokal ausführen. Es ist ebenso möglich, dass Sie die Beam-Pipeline direkt, ohne Airflow, ausführen. Dazu können Sie einfach:
python beam_pipeline.py
ausführen. Stellen Sie sicher, dass der ESC-Microservice läuft und die Konfiguration auf die richtige URL zeigt.
Den ESC-Microservice können Sie ebenfalls lokal ausführen. Dazu starten Sie einfach den kompilierte Anwendung. Diese können Sie selber mit SBT bauen oder direkt von GitHub herunterladen. Dort finden Sie das Binary unter "Release". Für das Deployment auf der GCP finden Sie ein entsprechendes Docker-Image, welches Sie in in Cloud Run deployen können. Insbesondere für den Setup von Filestore im Zusammenhang mit Cloud Run weise ich nochmals auf diesen Artikel hin: https://ronnyfuchs.blogspot.com/2024/05/cloud-serie-gcp-filestore-vs-cloud.html
Das lokale Ausführen scheitert gelegentlich an fehlenden Berechtigungen für die Cloud-Komponenten (Cloud Storage und BigQuery). Stellen Sie sicher, dass die Bibliotheken über die notwendigen Berechtigungen verfügen und eine gültige Anmeldung vorhanden ist. Insbesondere für die Beam-Pipeline, da diese in einer eigenen virtuellen Umgebung von Python ausgeführt wird.
Damit sind wir am Ende dieses Artikels angekommen. Ich hoffe, Sie konnten den einen oder anderen Input zu Apache Airflow und die Kombination mit Apache Beam/Dataflow mitnehmen.
Keine Kommentare:
Kommentar veröffentlichen