Montag, 16. September 2024

BigQuery Vector Search

Vector search in Google's BigQuery has recently become officially available (GA). This is an opportunity to take a look at vector search and its possibilities.

Editorial note: This article was first published in German. The image is not translated.

Vector comes from the Latin vector, which can be translated as carrier or driver. There are vectors in various mathematical and physical subfields. Vectors in geometry are very similar to vectors in physics, but not identical. However, (n-)tuples, for example, are also sometimes referred to as vectors. What they all have in common is that addition and scalar multiplication (linear algebra) can be applied to them, the vectors, in a vector or tuple space. That is why they are referred to as vectors. Physical and many geometric vectors have a position and a direction and are therefore often represented as an arrow. They represent, for example, a physical quantity such as speed. Vectors in the sense of tuples are much more generic and can represent a wide range of information. While vectors in geometry and physics often feel comfortable in spaces of 2 and 3 dimensions, “tuple vectors” often form multidimensional spaces. Their visual representation is correspondingly demanding.

Vector calculus was established by the German mathematician Hermann Günther Grassmann in 1844. So, 180 years after the discovery of vectors, BigQuery offers vector-based search.


Vector search in BigQuery – a platform with building blocks


The term “vector search” may suggest an overly simplistic idea. It is not the case that you simply enter VECTOR “com” instead of LIKE “%com%” in BigQuery and BigQuery then performs a text-based vector search. Rather, vector search in BigQuery is a platform that can be used to perform a vector search. A vector search roughly consists of two components: the vectors themselves and the distance algorithm.

So, unlike geometric vectors, for example, the aim is not to move them in space, but to calculate the distance between vectors. The distance between the vectors must reflect their semantic similarity in order to deliver meaningful results for a search in the context of BigQuery. After all, the aim of the search is to find similar, semantically identical texts, images, videos, etc.

2 variants in BigQuery: with and without a model

As mentioned, vector search in BigQuery is a construction kit. The most minimal variant from BigQuery's perspective is to store the vectors in a table and to pass a vector as an Array<Float64> directly for the query. BigQuery then calculates the desired nearest candidates using the configured distance algorithm. Google has already done the first part, for example, in the public patent dataset. There is a new column “embeddings_v1”:

Google's embeddings in the patent dataset


We will come to the topic of embeddings in a moment. To let BigQuery know how you want to calculate the distance, you have to define which distance algorithm you want to use to calculate the vector distance. BigQuery currently offers the following algorithms:

  • Euclidean distance
  • Cosine similarity
  • Scalar product.

You can also define additional parameters such as top_k (the number of neighbors you want to get back).

For large amounts of data, it is worth additionally indexing the stored vectors. BigQuery offers special vector indices for this. This significantly increases query performance.


Embeddings and the models

Now we come to the somewhat more difficult part. How do I get the vectors and how do the semantics get into these vectors?

So far, we have simply assumed that you have stored the vectors in BigQuery and specify a vector when making a query. The question arises as to how you get the vectors to store and which models are available to you?

Google provides you with over 20 different models in BigQuery:

The CREATE MODEL statement | BigQuery | Google Cloud

You can also use remote models. As far as I have seen so far, Vertex AI models are currently available for remote models. You define the desired model in Vertex AI, train it if necessary, and can then use it directly in BigQuery. With Vertex AI in particular, you basically have two options: you can calculate the vectors directly in Vertex AI yourself and then save them in BigQuery. To do this, you have to write some program code. This is the most minimal variant from BigQuery's point of view. Or you can integrate the models into BigQuery via remote models and then use the model directly in BigQuery with the BigQuery SQL dialect.

Which brings us back to the question of what embeddings are and how semantics get into the vector.

It is not the case that, for example, by placing words or letters as numbers in a multidimensional vector space, a semantic search can be carried out automatically. Rather, the vectorization of semantic information helps in the search by allowing the algorithms of linear algebra to be applied. The efficiency of such algorithms also comes into play here. But of course it is also not the case that by placing words or letters as numbers in a multidimensional vector space, a semantic similarity between the vectors automatically arises, the closer they are.

Before we look at how semantics are created based on textual information, it must be mentioned for the sake of completeness that the vector search is not only suitable for texts. It can also be used to search images, audio and video files. The way in which the semantics enter the vector in such cases is correspondingly different from that of texts. The specific procedure depends on the purpose. Do you want to find similar music, detect anomalies in sounds (e.g. for quality assurance) or have speech recognition (only music with Italian lyrics)? Not every method is equally well suited for every purpose.


To put it simply, embedding is the semantic positioning of a word or text in a defined vector space. Thus, an embedding is actually nothing more than a specific vector. A vector that represents the semantics (of texts). There are various methods and procedures for calculating the embedding, such as Word2Vec, GloVe or FastText. In addition, there are transformer-based methods. I wrote about transformers in this post here. I can only speculate, but I assume that Google uses a transformer-based solution in the Gemini embedding engine (link). Google currently offers 6 different embedding models. 4 English-language and 2 multilingual:

  • textembedding-gecko-multilingual@001
  • text-multilingual-embedding-002

For vector search of texts in BigQuery, the embeddings from Gemini are therefore probably best suited as a vector. The embeddings from Gemini can also be used directly as a remote model in BigQuery.

Excursus Array: The representation of embeddings in BigQuery as Array<Float64> is nothing other than the matrix notation/representation of vectors.


Limits of similarity

Cumulatively, embeddings are only as good as the training data and the appropriate use. For example, in the context of vehicles and escape, the names Bonnie & Clyde are close to each other. If you just want to know if Bonnie is similar to Clyde, this is of course not very helpful. As an aside, names in general are extremely difficult for meaningful embeddings, since they are either strongly context-related (it is about this name right now) or not at all (so the name is not relevant at all). In addition, many names do not occur frequently in training data. Searching for similarities between names using embeddings is therefore unlikely to provide sufficiently robust results. Fortunately, there are special software solutions for this (such as ESC). However, for many texts, embeddings are very well suited to determine semantic similarity.

Google's BigQuery Vector Search documentation can be found here, among other places.


What does the possibility mean from an IT architecture perspective?

A new search function is nice, but what does it mean from an IT architecture perspective? It allows you to simplify the architecture. By being able to implement the function directly in BigQuery, you don't need any index pipelines to load and vectorize the data. And you don't need to write your own query or search logic, which you then have to enrich with data from BigQuery. You can do it directly with a query. This reduces the number of components, services and systems and thus simplifies your architecture overall. You can also reuse existing SQL/BigQuery know-how.


There is a slight risk, however, that your application will become a monolith. Try to modularize within BigQuery as well and work with interfaces for data models to reduce complexity. Make it possible to develop and customize the rest of the application independently of the vector search. And, of course, the other way around, too: you should be able to improve the vector search independently of the rest in an evolutionary way.


This is how you can get the most out of the integrated vector search.


Out of the box? No.

I repeat myself, the vector search in BigQuery is a powerful construction kit and not a ready-made out-of-the-box function. To implement a powerful vector search in BigQuery, you need expertise in machine learning. You need to understand the different models and their parameters. You need to be able to convert the similarity search into a classification for evaluation and quality assurance purposes in order to be able to apply established evaluation metrics such as F1. To do this, you need appropriate (classified) test data. Vector search in BigQuery is only worthwhile for you if you can use it to generate real business value. Google gives you computing power and a toolbox. To use it effectively in your own application, you have to roll up your sleeves and get to work.

Samstag, 14. September 2024

Outside in: Der Einfluss von BigQuery Remote Functions auf die Architektur

Mit einer Remote Function lassen sich externe Funktionen in BigQuery direkt im SQL-Code einbinden/ausführen. Das hat viel Charme aber einen direkten Einfluss auf die Architektur.

Ausgangslage

Wir schauen uns dies an einem konkreten Beispiel an. Wir haben zwei Tabellen, welche neben einer Kunden-Id und weiteren Feldern beide ein Namensfeld haben. Wir wollen nun jeweils die beiden Namen derselben Kunden-Id bezüglich der Ähnlichkeit vergleichen. Das Resultat wollen wir in eine dritte Tabelle speichern:

Id   |  NameA  | NameB  | Similarity

Die Berechnung der Ähnlichkeit ist ein komplexer Vorgang und kann nicht direkt mit SQL berechnet werden. Wir brauchen dazu eine spezifische Softwarebibliothek mit einem entsprechenden Algorithmus.

Die Resultat-Tabelle wollen wir drei Mal am Tag, also alle 8 Stunden neu erstellen.

Wir gehen hier weiter davon aus, dass die beiden Tabellen bestehend sind, respektive von unterschiedlichen Applikationen zu unterschiedlichen Zeitpunkten erstellt werden. Die Berechnung der Ähnlichkeit kann somit nicht direkt im Prozess der Erstellung dieser Tabellen erfolgen.

Für die Ähnlichkeitsberechnung verwenden wir diese Software-Bibliothek: https://github.com/asderix/Esc

Lösungsvariante A: Eine Funktion mit Cloud Scheduler

Eine klassische Architektur wäre, dass wir eine Funktion schreiben, welche die folgenden Aufgaben wahrnimmt:
  • ESC-Bibliothek einbinden
  • Verbindung zu BigQuery herstellen
  • Tabellen mittels JOIN abfragen
  • Durch das Record-Set iterieren und für jeden Record von ESC die Ähnlichkeit berechnen lassen
  • Die Resultate in die neue Tabelle schreiben
Für einzelne Aufgaben gibt es wiederum mehrere Varianten, wo es einen Architektur-Entscheid bräuchte. So gibt es mehrere Möglichkeiten, Daten in BigQuery effizient zu speichern. Wir beschränken uns hier erstmal mit der mittleren Flughöhe. Diese Funktion wird nun mit einem GCP Cloud Scheduler alle 8 Stunden aufgerufen. Die Architektur würde wie folgt aussehen:



Wenn wir die Ähnlichkeitsfunktion wirklich nur einmal in diesem Use Case, brauchen, ist die direkte verwenden zweckmässig. Brauchen wir sie in mehr als einen Use Case, ist eine Kapselung im Sinne einer Microservice-Architektur sinnvoll. Damit kommen wir zur Lösungsvariante B.
 
Lösungsvariante B: Zwei Funktionen mit Cloud Scheduler

Diese Lösungsvariante teilt Funktionen in Microservices auf und lagert daher die Ähnlichkeitsberechnung in eine eigene Funktion aus. Der Rest der eigentlichen Funktion bleibt soweit unverändert:
  • Verbindung zu BigQuery herstellen
  • Tabellen mittels JOIN abfragen
  • Durch das Record-Set iterieren und für jeden Record vom ESC-Service die Ähnlichkeit berechnen lassen
  • Die Resultate in die neue Tabelle schreiben
Die ESC-Function nimmt als eigenständiger Service die folgenden Aufgaben wahr:
  • ESC-Bibliothek einbinden
  • Von ESC die Ähnlichkeit der Inputs (Namen) berechnen lassen
  • Ähnlichkeitswert zurück geben
Die Hauptfunktion wird ebenfalls mit einem GCP Cloud Scheduler alle 8 Stunden aufgerufen. Die Architektur würde wie folgt aussehen:


Der Nachteil dieser Variante ist, dass wir zwei Funktionen brauchen und mit dem Cloud Scheduler (analog Variante A) ein weiteres GCP Produkt einsetzen müssen. Damit kommen wir zur Lösungsvariante C.

Lösungsvariante C: Eine generische Funktion mit BigQuery

Bei dieser Variante übernehmen wir die generische Funktion der Ähnlichkeitsberechnung von Variante B und machen den Rest direkt in BigQuery. In BigQuery machen wir entsprechend folgendes:
  • Die Ähnlichkeits-Function als Remote Function (User Defined Function, UDF, mit einem Remote-Call) anbinden
  • Eine SQL-Query schreiben, welche die beiden Tabellen joint und den Ähnlichkeitswert direkt mittels Funktionsaufruf berechnen lässt
  • Die gespeicherte Abfrage terminieren wir direkt in BigQuery auf alle 8 Stunden und legen fest, dass das Resultat in einer neue Tabelle (überschreiben) gespeichert werden soll
Das Schedulen einer Abfrage in BigQuery kann sehr einfach vorgenommen werden:
Eine geplante Abfrage in der Konsole erstellen

Die Architektur sieht in dieser Lösungsvariante folgendermassen aus:



Das Schöne an dieser Variante ist das Einfache. Wir haben eine generische Funktion als Microservice und der Rest der Logik haben wir direkt in BigQuery mit nur einer einzigen Abfrage umgesetzt.

Outside in

Sie haben sicher festgestellt, dass die Remote Function in BigQuery die Architektur umkehrt. Klassisch ist die Logik und Steuerung in einer externen Anwendung (Funktion) und die Datenbank dient nur als Speicher, welcher abgefragt wird und dann wieder die Ergebnisse speichert. Mit der Remote Function steht nun die zentrale Business Logik in BigQuery und die spezifische Funktion wird von aussen zu BigQuery hinein gezogen. Damit eröffnen sich neue Architektur-Patterns.

Was ist besser?

It depends sucks. Aber es kommt darauf an. Wenn aber die eigentliche Funktion nur darin besteht, Daten zu laden, diese Record für Record zu iterieren und irgend eine Logik darauf anzuwenden und die Resultate dann wieder zu speichern, ist die Outside in Variante eleganter. Insbesondere dann, wenn die spezifische Logik, die auf die Records angewendet wird, von generischer Natur ist und wiederverwendet werden kann. Dann kann man sich einen Microservice sparen und muss Business-Logik nicht imperativ programmieren. Eine simple SQL-Query - auch wenn sie einen Remote Function Call beinhaltet - ist meist verständlicher und besser wartbar.

Wie immer gibt es eine Kehrseite der Medaille. Individuelle Funktionen und gespeicherte Queries und insbesondere geplante Abfragen können unübersichtlich werden und bedürfen einer guten (Architektur-) Dokumentation. Achten Sie darauf, dass Sie auch in BigQuery modularisieren und Tabellen und Abfrage als Schnittstelle betrachten und so isoliert wie möglich umsetzen. Ansonsten laufen Sie Gefahr, hier einen Monolithen zu etablieren.

Betreffend die Übersicht und Dokumentation empfehle ich, solche Definitionen in BigQuery mit Terraform oder einem anderen geeigneten IaaC-Tool zu definieren. Keines Falls sollten Sie in der Produktion solche Dinge von Hand in der Konsole zusammenklicken. Mit einer gut strukturierten IaaC-Definition behalten Sie auch Outside in im Griff.

Implementierung Lösungsvariante C

Ich habe den Microservice mit ESC und die Remote Function in BigQuery implementiert und für die Abfrage eine Beispiel-Query erstellt. Sie finden den kompletten Source-Code inkl. Terraform Open-Source auf GitHub: https://github.com/asderix/ESC-GCP-BQ-Remote-Function--Demo

Das Resultat der Abfrage in BigQuery sieht wie folgt aus:


Die Werte in der Spalte "Similarity" kommen von der Remote Function, welche die Cloud Run function aufruft, welche die ESC-Bibliothek verwendet.

Ressource: BigQuery ruft bei der Nutzung einer Remote Function diese mit einem Http-Post mit einem bestimmten JSON als Body auf. Das JSON-Format dieses Aufrufes finden Sie hier.

Freitag, 13. September 2024

Streaming with Apache Beam and GCP Dataflow - Transaction monitoring with ESC as stream (English version)

Streaming with Apache Beam and GCP Dataflow - Transaction Monitoring with ESC as Stream

Editorial note: This article was first published in German in May 2024. The images were not translated.

In today's data-driven world, streaming applications have become essential for processing continuous streams of data in real time. However, the world of streaming can be complex and challenging, especially when it comes to developing robust and scalable solutions. Apache Beam is a powerful framework that facilitates the development of batch and streaming data pipelines. Apache Beam abstracts the underlying processing logic and allows developers to focus on the business logic instead of dealing with the details of the implementation. Despite the abstraction of the implementation from the actual execution logic, a certain understanding of the theoretical foundations of streaming applications is necessary to better understand Apache Beam and write better applications with it. For this reason, I will first present some theoretical aspects and application examples of streaming applications. After that, I will explain in more detail what Apache Beam is and how it can be used to program streaming and/or batch applications. Finally, I will show how to run an Apache Beam application in the Google Cloud Platform (GCP) in a highly available and scalable manner using Dataflow, and what possible use cases could be.

A bit of streaming theory

Streaming is used frequently in software development in a variety of contexts. Many people are familiar with reading files as a stream, reading a string as a stream or processing a known HTTP request as a stream. In all these cases, however, the “streams” are finite and more akin to batch processing, where a predetermined or well-known amount of data is processed. However, when we talk about streaming in the context of real-world streaming applications and frameworks like Apache Beam, it's a bit different. Here, streaming implies a theoretically infinite stream of data, with no way of knowing if or when it will stop. These streams are continuous and dynamic, meaning data can arrive at any time in an unpredictable order. In contrast to a stream from a file, which is ordered and closed, such continuous streams are often disordered, i.e. older data can appear later in the stream. This places increased demands on a processing system. This is especially true when real-time analyses are to be carried out from such a data stream. The requirements increase further if the processing of certain processing steps is also to be parallelized. This complexity must be kept in mind when wondering why a special framework like Apache Beam should be used for a streaming application at all.

Let's take a closer look at these challenges using a specific example. We want to display the 10 terms most frequently used in articles from traditional and social media in real time on our website. What seems trivial at first glance requires several components and decisions on closer inspection.

We need various gatherers to tap into the different data sources. These gatherers need to obtain new articles from the desired media portals and obtain posts from the desired social media platforms such as X (formerly Twitter), Facebook, Instagram, etc. These gatherers provide the information asynchronously via pub/sub. This already raises the first question: does the gatherer already provide an extracted word list or the entire article? Our streaming application then subscribes to the corresponding pub/sub topic to receive the word lists or articles. The next question is: what time period should be covered? The 10 most frequently used terms in which time period? Once the hit list has been created, it has to be stored somehow. This leads to the next question: what does the streaming application do with the result? Does it store it somewhere in the database or does it make the result available in a pub/sub topic? Finally, the current list of results has to be sent to the website. This raises the question of how the update should be carried out. Is there a webhock and the list of results is pushed to the website, or does the website send regular pull requests to a backend to request the current list of results? Questions upon questions and enough architecture material to write further articles about. At this point, we will concentrate on the actual streaming application and its challenges. We assume that the gatherer posts entire articles in Pub/Sub and that the hit list should display the last 30 seconds. Our rough process could then look as follows:

  1. Pub/Sub message arrives
  2. Split the article into individual words
  3. Calculate the word frequencies of the article/post
  4. Update the current word frequencies across all articles/posts
  5. After a time frame of 30 seconds, set the current word frequency of the top 10 in Pub/Sub (we are using this solution)

We are keeping it simple here. The second point could be further developed with the question of whether the words should be taken over 1:1 or whether it would not make more sense to normalize the words. So, for example, to use a stemmer and additionally to write all letters in lower case. This normalization could be divided into further individual steps. We deliberately did not address the topic of the language of the articles/postings, as how to deal with it would certainly also need to be clarified. We now have a rough shared picture of how our streaming application works, so we can now turn to the specific challenges.

With the general word frequency and the time window (30 seconds), we have already unconsciously made an important decision for the streaming application. Namely, the question of how we define the window. Let's look at the following example:

Fig. 1: Different types of information on the time axis

We have a stream with three types of information: A, B and C. For processing, we have two basic options: 1) Each element individually. 2) Elements in groups. For our use case with word frequencies, option 1 is out of the question; that would be pointless, because then we would not have the frequencies of all articles, but only the frequencies of one article at a time. In the basic option 2, however, there are several possibilities. We can fix the window after a certain time. For example, our 30 seconds. But we can also define the window flexibly by type. A window would always be as long as the same type comes. So a window for A, B and C. The following figure shows this:

Fig. 2: Window by time (top) and by type (bottom) – of course there are other ways of separating the data, for example by number, but we won't go into that here


In the first case, we have a fixed window by time. That means that in the first window we have [A, A], in the second [B], in the third [C, A], in the fourth [B], in the fifth [B] and in the sixth [C]. In the second case, we have dynamic windows by type. That means in the first window [A, A], in the second [B], in the third [C], in the fourth [A], in the fifth [B, B] and in the sixth [C].

When we process information in information baskets in a streaming application, we have to define when such a basket ends. As we know, the stream is (theoretically) infinite. So we need criteria for when we have such a basket together in order to be able to process it as a whole. Which type of “separation” is best depends on the specific application. In our case, a fixed time window seems to be best. We want to treat all articles/posts equally and only make a statement about the current top 10 words. This brings us to another important distinguishing criterion for streaming applications and the challenge of how to deal with them: what time do we take? Remember: we have various gatherers that collect articles and posts from different sources. The timeline above shows when this information flows into our streaming system. We need to be aware that not all articles/posts that enter our system in a certain order are published in that order. This can depend on various factors: the gatherers work at different speeds. The media do not all publish the articles at the same speed, or it takes different amounts of time to receive the articles. Finally, the gatherers feed this information into Pub/Sub. Pub/Sub does not guarantee any particular order. So there is no guarantee that an article/post that was delivered to Pub/Sub first will actually be delivered to the subscriber before an article/post that was delivered to Pub/Sub later. The following figure shows this schematically:


Fig. 3: Arrival vs. publication time

When we think in terms of “streams”, we have to distinguish between the time at which an event occurred, in our case the time of publication, and the time of processing. When the event arrives in the streaming system for processing. In the figure above, the seconds have been omitted for better readability and the differences are shown in minutes. If we now define a time window of 2 minutes, the baskets for word frequencies look like this

Fig. 4: 3 baskets by publication time


The big question now is: How does the system know that after the end of the first basket, i.e. when “B” with publication time 10:32 a.m. has arrived, further articles/posts with a publication time between 10:30 a.m. and 10:31 a.m. will arrive, so the basket cannot yet be closed? Please remember the term “watermark” at this point. We will come back to this later.

Another challenge that we can see is the necessary parallelism. We will simply assume here that we know that the first basket will not be closed after “B” arrives at 10:32. We consider it open for the following “C” at 10:30. However, we have to process “B” that arrived at 10:32. This means that at this point in time we have to process two baskets simultaneously.

So far, we have implicitly limited ourselves to word frequency when answering the question of how (simplified as one step). Of course, we can perform very different types of processing on the elements of a defined basket and also combine them. For example, we can filter them, combine them in pairs or group them. The following figure shows these possibilities:

Fig. 5: Various processing options


So we define the window that determines the contents of the shopping cart, the what, and how we process such a cart, the how. This is important because, depending on the algorithm, parallelization of processing in this step is possible or better possible or not.

In this article, we will limit ourselves to these questions and challenges of streaming systems (you could fill books with them) and summarize our findings so far: In an endless stream of events, we need to define when we can make a “cut” in order to process a part of the stream as a unit. We can define this “cut” in different ways. A temporal definition is particularly difficult if it is not the point in time of entry into the streaming system that is relevant, but the original point in time of the event outside the streaming system. This can lead to situations in which we have to keep several “cuts” open at the same time and assign the incoming events to the corresponding basket. There are different ways to process a basket. Depending on the requirements, we can filter elements (if we only need certain elements), group elements (if we need group-specific processing), or form pairs (if we can form key-value pairs, for example).

A bit of history

Before we look at how one or the other question and challenge was answered or solved in Apache Beam, it's time for a bit of history. Why? For two reasons: 1) It helps to understand where Apache Beam comes from and what it has to do with Apache Spark, Apache Flink and Google's Dataflow. 2) We all stand on the shoulders of giants. The fact that we can now implement robust streaming systems relatively easily and abstractly is thanks to many people who have been working on this and related topics for years. These are people in the background that hardly anyone knows. I don't think it would be appropriate to write about how easy it is to write great streaming applications without at least acknowledging them as a cohort.

MapReduce (2003): Google released the MapReduce framework, which revolutionized large-scale data processing. It provided a simple programming model based on two main functions, Map and Reduce, and enabled the distribution of tasks across many nodes in a cluster. Hadoop (2004): Apache Hadoop was developed by Doug Cutting (who also wrote Apache Lucene, by the way) and Mike Cafarella and is based on the principles of MapReduce. Hadoop has brought the idea of distributed storage systems and a distributed data processing platform to the open source world, enabling many companies to process large amounts of data efficiently. Flume (2007): Apache Flume was developed to collect, aggregate and move large amounts of streaming data in real time. It was developed primarily for collecting log data in distributed systems. MillWheel (2007): Google MillWheel is a framework that enables highly scalable and secure processing of streaming processes. MillWheel introduced concepts such as strict consistency, low latency and exactly-once processing. These concepts had a major influence on the later development of Google Dataflow.

Side note: So far, we have found that events can sometimes arrive late. We had not yet dealt with what happens if events arrive late and the corresponding basket has already been processed. Furthermore, with all the definitions and parallel processing, we have intuitively assumed that each event is logically processed only once. “C” at 10:30 a.m. lands in exactly one basket and is processed exactly once. What seems normal to us was not always the case and is not always trivial in detail. In MillWheel, the concept of “Exactly one matters” has found a fitting place and important theoretical foundations have been created that still apply and are used today.

Apache Spark (2009): Apache Spark was developed to address the limitations of MapReduce. It offers faster in-memory processing and supports both batch and streaming data processing. Spark's extensibility and ability to create complex data processing workflows quickly made it popular. Apache Flink (2010): Flink was developed to process data streams in real time. It provides a high-performance framework for low-latency, high-throughput streaming and batch data processing.

Apache Storm (2011): Storm enabled real-time data processing with low latency and high fault tolerance. Developed by Twitter, Storm was widely adopted in scenarios requiring rapid data processing. Google Dataflow (2014): Dataflow was another step in the evolution of data processing at Google, based on the experience with MapReduce and MillWheel. Dataflow offers a flexible and scalable model for batch and streaming processing and was introduced as a fully managed service on the Google Cloud Platform. We will come back to Dataflow in more detail. Apache Beam (2016): Apache Beam originated from Google Dataflow and brought the concept of a unified programming environment for batch and streaming data processing into the open source world. Apache Beam provides an SDK that is supported by various runtime environments, including Google Dataflow, Apache Spark, Apache Flink and others. It abstracts the underlying processing logic and allows developers to write their pipelines once and run them anywhere.

Furthermore, there are other important developments in the ecosystem of streaming applications. Here, Apache Kafka and Google Pub/Sub certainly deserve a mention. Both enable streaming applications to be supplied with data asynchronously from different sources.

As can be seen, it has been a rather long journey from initial concepts such as MapReduce to robust and relatively easy-to-implement streaming applications in terms of the generally rapid technological development in IT.

Abstraction with Apache Beam

Dataflow from Google had two strong components. Firstly, the runtime environment, generally referred to as the “Runner”. Secondly, an SDK. This SDK implemented a unified programming model that provided the greatest possible abstraction of the underlying processes. For example, the developer does not have to worry about how to robustly implement a time window. He simply defines the desired time window with the SDK. The executing component, the runner, takes care of the corresponding implementation. Google Dataflow had a local runner for development and testing purposes, i.e. a simplified runtime environment that a developer could use to quickly test the behavior of his application locally. In line with its philosophy, Google then decided to transfer the Dataflow SDK and local runner to the Apache Foundation as open source software. This was the birth of Apache Beam. The GCP product Dataflow is now a fully managed, robust and highly scalable execution platform for streaming applications developed with Apache Beam. After the SDK became open-source, other runtime environments for streaming applications also implemented Apache Beam-compatible “runners”. This means that the same application can be executed not only in Dataflow, but also in Apache Spark or Apache Flink, for example. I don't want to go into too much detail at this point. Just this much: there are differences in the implementation that can affect the behavior of the application. Before we take a look at the Apache Beam abstraction model, I need to clarify something. So far, we have always been talking about streaming applications and I explained at the beginning that there is a difference to classic batch processing. That is still true. However, if we understand how streaming applications work, we can also imagine that batch processing is simply a special variant of a streaming application. Just one that is clearer and more predictable. And that is exactly how Apache Beam was developed. It can be used to implement batch and streaming applications in the same way. In the Apache Beam environment, this is often referred to as “bounded” and “unbounded data”. Bounded data is batch processing and unbounded data is stream processing. This has the major advantage that you can use the same technology. In my experience, a product or company usually has more batch processes than real streaming applications. If you implement these in Apache Beam, you have enough know-how for the more rare real streaming applications.

The Apache Beam programming model basically consists of a few simple but important components/objects (not exhaustive):

Program flow

Pipeline:

The pipeline is the central object that describes the entire data processing. It contains all PCollections and PTransforms and thus defines the data processing flow, from the input of the data to the completion.

PCollection:

PCollection stands for “parallel collection”. The PCollection is a generic data structure in the Apache Beam programming model that represents a distributed, immutable collection of data representations. A PCollection can contain any number of elements of any data type. PCollections are passed as inputs to the transformation steps (PTransforms) and each PTransform in turn has at least one PCollection as a return value. Since these are immutable, a new PCollection object is created when changes occur during a transformation step.

PTransform:

PTransform stands for “parallel transformation” and is an abstraction that represents a data processing operation on one or more PCollections. PTransforms describe the processing logic and can range from simple operations such as filtering and mapping to more complex aggregations. PTransforms can be created in a number of ways: using pre-built standard transformations such as GroupByKey, Combine, etc. and using custom transformations. Each PTransform takes one or more PCollections as input and generates one or more PCollections as output. PTransform objects thus define how the data of the PCollection(s) is transformed.

Settings/configurations

Window:

Windows define time intervals or other criteria according to which elements in a PCollection are grouped. There are various standard windowing strategies: Fixed Windows, Sliding Windows or Session Windows. It is possible to implement your own strategies.

Fixed Windows:

The windows replace each other seamlessly, there is no overlap. When the first window ends, the next window begins. You simply define the time duration, e.g. 5 minutes.

Sliding Windows:

The windows usually overlap, depending on the configuration. You define the time duration and the interval. For example, 5 minutes and 1 minute. Every minute, a window opens that lasts 5 minutes. This means that an element that appears at 4 minutes will be included in windows 1, 2, 3 and 4. You may have already noticed that the concept of “Exactly one matters” has a different meaning here than you might have assumed when you first read the concept. Exactly one refers to the windows. When do you need this strategy? The strategy is particularly useful in scenarios where you want to calculate trends. For example, you might want to output a trend for each minute of the last five minutes.

Session Windows:

Session windows define a grouping and an inactivity duration. The window size is therefore variable. Session windows are useful if you want to group a logical unit of data. A typical example is a user interaction in a web application. You want to analyze the behavior but only capture actions as a unit if they occur in close temporal proximity. For example, if the user does not click for 5 minutes, the session closes and the next user actions are analyzed in a new session.

Watermark:

In my experience, it is important to use the correct translation for this term in German. Do not associate watermark with the German term “Wasserzeichen”, but with the terms “Wasserstand” or “Pegelstand”. The term has little to do with watermarks on banknotes or stationery. Watermark is a central component of the underlying concept of Apache Beam. Watermarks provide a balance between latency and completeness of data processing. In this context, a watermark is not a fixed level. Rather, it provides an estimate. An estimate of the time T at which all expected data will have arrived. Remember that in batch processing it is clear when all elements have arrived, but not in a stream. Take another look at Figure 4. How does the stream process know that a “C” at 10:30 will arrive after a “B” at 10:32? It's simple. It estimates, using a watermark, when – time T – all the elements of the first window will have arrived. It is sometimes, but not always, important to find a balance between a short latency, i.e. the need to deliver the results as quickly as possible and close the window as quickly as possible, and the need for completeness, i.e. to take into account as many of the associated elements as possible. It would be going too far to go into the calculation of watermarks in detail, and the advantage is that a developer usually does not have to deal with the implementation. In principle, it is a matter of heuristics that are calculated on the basis of various data from the source. Strictly speaking, Apache Beam does not calculate watermarks. The runner does that. Incidentally, this is one of the differences between different runners such as Apache Spark, Apache Flink and Dataflow. Not every runner calculates the watermarks the same for each source. Therefore, identical Apache Beam source code can produce different results. For example, the Google Dataflow team has tried to implement an optimized watermark calculation for the Pub/Sub source.

Trigger:

Triggers determine when the collected elements in a window are processed and the results emitted, i.e. when the window is closed. There are four standard triggers:

AfterWatermark:

Fired as soon as the point in time according to Watermark has been reached at which all events should have arrived. This is the most common trigger for time-controlled windows, especially when there should be a balance between latency and completeness.

AfterProcessingTime:

Fired based on the processing time after a certain delay. So the watermark is not used. The time is the time at which the events arrive. For example, a trigger of 5 minutes is fired 5 minutes after the arrival of the first event. This trigger is useful when processing time (latency) is weighted higher than completeness. Regardless of whether events are still arriving, after 5 minutes (in our example) the window is closed; events arriving later are handled in a new window.

AfterPaneElementCount:

This is triggered when a certain number of elements have been collected in a window. Time is therefore not a factor with this trigger. If the trigger is set to 10, it is triggered after 10 events have been received. This trigger is useful if you want to process a fixed number of events together.

Repeatedly:

This trigger repeats an underlying trigger and fires accordingly when the underlying trigger fires after a defined period (“delay”). Suppose we have an AfterProcessingTime trigger of 5 minutes. This fires 5 minutes after the first event occurs. With the Repeatedly trigger, we can now repeat this trigger every 5 minutes ("delay = 5), for example. So 5 minutes after the first firing, the trigger is triggered again. This trigger can be useful if you need regular updates, for example in real-time dashboards.


Timestamp:

Timestamps are assigned to the elements in a PCollection and represent the time at which the event occurred.

Program logic/data manipulation

GroupByKey:

GroupByKey groups elements of a PCollection by key and creates a new PCollection in which all values with the same key are grouped together.


Combine:

Combine is an aggregation on a PCollection. This can be used to perform standard aggregations such as sum or arithmetic mean, or custom aggregations. The result is a new PCollection.

Flatten:

Flatten combines multiple PCollections into a single PCollection. This can be used, for example, to merge multiple incoming PCollections into one and pass it as output to a next step, which then executes an aggregation function (Combine) on it, for example.

Partition:

Partition is the opposite of Flatten and divides a PCollection into several PCollections.


Filter:

As the name suggests, this PTransform can be used to filter individual elements from a PCollection.


Read/Write:

The name says it all. Read and Write are PTransforms that can be used to read data from a source or to write data to a target.

ParDo:

ParDo is a PTransform that applies user-defined processing to each element of a PCollection. As the name suggests (“Par” stands for parallel), it enables parallel processing of data. The user-defined processing is defined in a DoFn.

DoFn:

DoFn stands for “Do Function”. A DoFn is used to implement custom processing of an element of a PCollection. The DoFn is called by the ParDo, which applies the DoFn to each element of the PCollection.


External libraries:

There are numerous third-party libraries that implement Apache Beam interfaces and provide functions. Examples include Google's Pub/Sub or BigQuery reader, which make it easy to use Pub/Sub messages or BigQuery queries as event sources for an Apache Beam pipeline.


A picture is worth a thousand words. Let's take a look at the written theory of the model in a simplified graphical representation:


Fig. 6: Simplified representation of a pipeline with PCollections and PTransforms.

What is striking is that we have a pipeline and PCollection and PTransform. But where are Window, Watermark and Trigger? Or GroupyByKey, ParDo etc.? Window, Watermark (if used) and Trigger are at the beginning at/after “Create”. They define which specific items in the PCollection are to be used for the respective instance of the pipeline. The actual pipeline from “item” is executed in parallel, depending on the configuration and data. Let's look at this graphically as well:


Fig. 7: Window, Trigger and Watermark control the “start” of the pipeline instance


GroupByKey, Combine, Flatten, Filter etc. are standard PTransforms (green boxes) that can be easily used in the pipeline by configuration. In our case, for example, the words of the PCollection could be grouped in one step with GroupyByKey. ParDo with a DoFn could be used if you want to normalize the individual words (keywords stemmer, upper-/lowercase etc.).


There is one question we have so far skillfully ignored. What happens if “C” is late by 10:30 a.m. and, despite a good watermark, the window is already closed and the PC collection for this period has already been sent? In Apache Beam, there are basically two solutions for this problem. The first is the “allowedLateness” option as a window property. This can be used to define how long late events are accepted. These are then additionally placed in the next, actually incorrect window. With a setting of 2 minutes, events up to 2 minutes late are still processed. Later ones are ignored. The second option is often the better and correspondingly the more frequently used one: the list of “triggers” above is not complete. There is also a “late trigger”. This is triggered for all events that are late and you can define when or how often it should be executed (e.g. after time or number of such events). This allows you to handle these late events separately. The two options can also be combined.

Now we'll soon have everything we need to take a look at our first pipeline program. But before that, we need to take a brief look at the basics of the Apache Beam SDK. At this point, it should be noted that the introduction to Apache Beam in this article is only a rough overview and is intended as motivation to take a closer look at this framework. Within the scope of this article, we can only scratch the surface. When “we” is mentioned, it is mainly me as the author who is meant. Back to the SDK. What programming language do you actually program Apache Beam in? The Apache Project Apache Beam officially maintains SDKs for the following programming languages:

  • Java
  • Python
  • Go
  • TypeScript

In addition, Spotify offers an official SDK for the Scala programming language:

Furthermore, the Apache Beam project offers a YAML interface. This allows Apache Beam pipelines to be created purely declaratively, without programming. This is particularly suitable for simple standardized pipelines that get by with the standard transformers and do not need their own logic. Documentation for the Apache Beam SDKs can be found here:

The question remains as to which language is best for writing Apache Beam applications. In my opinion, it is best to use the language that you know best or that is most commonly used for similar purposes. I will write the following examples in Python because I believe that the Apache Beam source code is most intuitive to read in Python.


Let's look at a specific pipeline in Python. Here, we will focus directly on implementing the pipeline. We will leave out the body of a complete program. Python leaves a lot of leeway for how you want to design the “trappings” with configuration, functions, swapping to your own classes, etc. There are many complete code examples. The official ones can be found here:


Fig. 8: Code snippet of a simple Apache Beam pipeline in Python

Before we look at the code in sequence, here is an explanation of the two most important overwritten Python operators: | and >>. These allow a pipeline to be very readable in Python. The bitwise not exclusive OR operator (“|”) is used in the Apache Beam SDK for concatenation. The bitwise right shift operator (“>>”) is used to connect the name/title of a step with its implementation. A Java/Python comparison makes this easier to understand if necessary:

Java: p.apply(“Name/Title”, PTransform...)

Python: p | “Name/Title” >> PTransform...


Let's first get a rough idea of the code based on the familiar graphical representation:


Fig. 9: Mapping code to graphical representation

  1. bean.Pipeline p opens the pipeline definition (yellow area)
  2. ReadFromPubSub is the “Create”, the source of the data
  3. WindowInto the window function
  4. FlatMap as the first PTransformer of the actual business logic
  5. Map here as the second PTransfomer of the business logic

The pipeline in the code can be clearly defined from top to bottom in the flow. Let's take a closer look at the individual PTransformer functions:


Decoding articles:

This map function is used to encode the texts from Pub/Sub in utf-8. It is important to ensure a defined encoding because we then apply string operations (regex) to it. This is a purely technical step.

Aggregate in fixed time windows:

We use WindoInto to define a fixed time window of one minute. The parameter is seconds and is better understood as 1 * 60. In a real application, this should be implemented as a parameter and not hard-coded.

Splitting words:

Here we pass the FlatMap function an anonymous function that splits the texts into individual words. To do this, we use a regex function. All words without special characters are extracted. In this, or an additional, step, further normalizations could be carried out. Keywords are stemmer or upper-/lowercase.

Form pairs:

Here we form pairs with the map function in preparation for the subsequent counting of words (the word is the key).


Group by words and sum:

The CombinePerKey function aggregates the key (in our case the word) and the values are summed. This gives us the number of words.

Formatting the result:

This map function for formatting is only used for a nice display.


Encoding the result:

Pub/Sub, or rather the function we use, expects bytes, while we have strings for processing internally. We ensure utf-8 again and then return the result of type byte.


Publishing the result in Pub/Sub:

Finally, we publish the formatted result in Pub/Sub again.

So far, so good. You may be wondering where the trigger, the watermark or all the PCollections are in the code. Good questions. Trigger: If we use a FixedWindow, we don't need an explicit trigger. The trigger is implicit and is triggered after a defined period of time, in our case every 60 seconds. Watermark: Since we have a FixedWindow and the timestamp refers to the entry into the pipeline (we have not defined an external timestamp), we do not need a watermark. In our example, we could use a watermark as a trigger and also accept late data at the same time. This could be implemented as follows:


Fig. 10: Example with watermark as trigger and allowed lateness


In the example above, a GlobalWindows was used instead of a FixedWindows and AfterWatermark was used as the explicit trigger. The last accumulation mode used is for cleanup.


PCollections are passed implicitly. The function beam.Map(lambda x: (x, 1)) returns a PCollection. The Runner passes this to the following function beam.CombinePerKey(sum) for us. We do not need to program this transfer explicitly.


This brings us to the end of our introduction to streaming applications in general and the basics of Apache Beam as a unified programming model for developing streaming applications.


Efficient and scalable execution with Dataflow

An Apache Beam application must be executed by a runner. The Apache Beam SDK includes a local runner for this purpose, but this is not intended for productive use. A secure, scalable and managed runner is provided by Google with Dataflow. Deploying an Apache Beam application in Dataflow is extremely easy and not much more complicated than running it locally:


Fig. 11: Apache Beam deployment on Dataflow

This is an example for Python. Make sure that the Apache Beam SDK is installed. You can then execute the deployment directly.


It goes without saying that, depending on the application and landing zone, further preparatory work may be necessary. If you are using pub/sub, you may need to set up the corresponding topics. To do this, you will be asked to activate numerous GCP APIs if they are not already activated. You can find complete instructions for these steps here, for example:

Real-time transaction monitoring as a stream

At least in Switzerland, the Embargo Act and its ordinances, among other things, effectively oblige banks to check the senders and recipients of payments against “sanction lists”. Depending on the risk assessment, such checks are carried out against further personal and entity databases. These checks should be carried out before the actual execution of the payment. If the payment is executed first, this may already constitute a violation. From a technical point of view, this means that the transaction check must be carried out “in real time”. This requirement can be met by various architecture patterns. Each has its advantages and disadvantages, which need to be weighed and considered. I would like to briefly introduce three different architecture patterns and discuss their advantages and disadvantages.


Synchronous in one step:

In this model, the actual payment transaction function synchronously calls the name-matching function in its own process. It waits for the result and then continues processing.


Advantages:

This pattern has the advantage that the process logic is in one place and the function can be tested relatively easily. With unit tests against a mock service, the function can already be tested quite comprehensively.

Disadvantages:

The disadvantages of this pattern are mainly in the performance, and possibly in the costs and maintainability. Performance: The payment application has to wait for the response. This means that if there are a large number of payments, many more instances of the function have to be available. This requires more RAM and CPU, or there is a risk that the entire application will slow down. Costs: Depending on the implementation and operation of the name-matching function, higher costs may be incurred. This is especially true when the function is operated with a model such as Cloud Functions, where the number of calls has a direct influence on the costs. Maintainability: The payment function itself has to implement all error handling, re-try mechanisms, etc. This turns the application into a monolith that becomes more difficult to maintain over time and increases the complexity of changes.


Decoupling with pub/sub:

In this pattern, the process is decoupled. The payment application performs the previous checks such as credit checks, etc. At the point of name checking, it stores the status and publishes the payment in Pub/Sub. This triggers the name matching function, which performs the check, stores the status and, in the positive case, triggers the payment application via Pub/Sub to execute the payment.

Advantages:

With this pattern, we keep the payment application lean and easy to maintain. Performance remains virtually unchanged, and publishing in pub/sub requires few resources. During the name matching check, the payment application can use its resources for other payments.

Disadvantages:

The potential cost disadvantage of the first pattern remains, since each check is executed individually. Compared to the first pattern, this solution requires more testing. The test environment is more complex, the unit tests are limited to smaller units, and the informative value is correspondingly lower.

Transaction testing as a stream:

In this pattern, the transaction test is considered as a stream. Payments are continuously arriving and need to be checked. As in the previous pattern, the payment application publishes the payment in pub/sub. However, the consumer of this pub/sub topic is not a function, but a stream application. This application now processes the payments in defined windows. Ideally in fixed sizes, e.g. always 10 transactions together and a maximum time span. So after 10 transactions or after 30 seconds, whichever comes first. Depending on the capabilities of the payment application, the stream application triggers the payment application per payment or in batch via pub/sub for execution.

Advantages:

With this pattern, we can call the name matching function in batches. This means we can have 10 payments checked at once, for example. This reduces the network load and the number of function calls. The advantages of the previous pattern remain.


Disadvantages:

As with the previous solution, the testing effort for this variant is higher compared to the first solution, and the informative value of the individual unit tests is more limited.

We almost have the tools for a corresponding Apache Beam application. The only thing missing is the combined window function. We always want to combine 10 transactions. However, if there are not that many transactions, we don't want to wait long; after all, the transactions should be executed quickly. That's why we defined the second condition: a maximum of 30 seconds after the first transaction arrives in the new window. This means that in the worst case, the start of a check takes 30 seconds if no more than 8 further transactions arrive afterwards. For this we can use the known global window with its own triggers. Here we can combine various triggers with “AfterAny”, which are linked logically or (“OR”). Our window definition could look like the following:


Fig. 12: Combined window with Repeatedly and AfterAny

Let's move on to the name-matching part. In a payment, we usually do not have a structured name from the recipient. This means that we often have a complete address in a field. This may well include several people. For example:


Mr. and Mrs.
Hans and Erika Müller-Meier

Musterstrasse 24

8001 Zurich

Switzerland

We cannot rely on a well-structured complete address. It could also look like this:


Mr. Hans and Erika Müller-Meier

Muster Street 24, CH-8001 Zurich


This is where the ESC function “findByAddress” comes into play. We can pass a complete address as a string to this function to search for names in the index. We do not need to worry about the structure of the address.

For the batch check, we can extend our sample application from this Cloud Series article: Running an Apache ESC index application serverless with Cloud Run to include an endpoint that takes a list of strings (addresses) with associated transaction numbers as input. This checks all of them and returns the result:


Fig. 13: Code extension with a new Rest-API endpoint – checking a list of addresses




Fig. 14: Local query of the new Rest-API endpoint

With this practical example, I hereby conclude the brief introduction to streaming applications with Apache Beam and their execution in Dataflow. I hope you were able to take away some architectural inspiration.

Further links: