Freitag, 13. September 2024

Streaming with Apache Beam and GCP Dataflow - Transaction monitoring with ESC as stream (English version)

Streaming with Apache Beam and GCP Dataflow - Transaction Monitoring with ESC as Stream

Editorial note: This article was first published in German in May 2024. The images were not translated.

In today's data-driven world, streaming applications have become essential for processing continuous streams of data in real time. However, the world of streaming can be complex and challenging, especially when it comes to developing robust and scalable solutions. Apache Beam is a powerful framework that facilitates the development of batch and streaming data pipelines. Apache Beam abstracts the underlying processing logic and allows developers to focus on the business logic instead of dealing with the details of the implementation. Despite the abstraction of the implementation from the actual execution logic, a certain understanding of the theoretical foundations of streaming applications is necessary to better understand Apache Beam and write better applications with it. For this reason, I will first present some theoretical aspects and application examples of streaming applications. After that, I will explain in more detail what Apache Beam is and how it can be used to program streaming and/or batch applications. Finally, I will show how to run an Apache Beam application in the Google Cloud Platform (GCP) in a highly available and scalable manner using Dataflow, and what possible use cases could be.

A bit of streaming theory

Streaming is used frequently in software development in a variety of contexts. Many people are familiar with reading files as a stream, reading a string as a stream or processing a known HTTP request as a stream. In all these cases, however, the “streams” are finite and more akin to batch processing, where a predetermined or well-known amount of data is processed. However, when we talk about streaming in the context of real-world streaming applications and frameworks like Apache Beam, it's a bit different. Here, streaming implies a theoretically infinite stream of data, with no way of knowing if or when it will stop. These streams are continuous and dynamic, meaning data can arrive at any time in an unpredictable order. In contrast to a stream from a file, which is ordered and closed, such continuous streams are often disordered, i.e. older data can appear later in the stream. This places increased demands on a processing system. This is especially true when real-time analyses are to be carried out from such a data stream. The requirements increase further if the processing of certain processing steps is also to be parallelized. This complexity must be kept in mind when wondering why a special framework like Apache Beam should be used for a streaming application at all.

Let's take a closer look at these challenges using a specific example. We want to display the 10 terms most frequently used in articles from traditional and social media in real time on our website. What seems trivial at first glance requires several components and decisions on closer inspection.

We need various gatherers to tap into the different data sources. These gatherers need to obtain new articles from the desired media portals and obtain posts from the desired social media platforms such as X (formerly Twitter), Facebook, Instagram, etc. These gatherers provide the information asynchronously via pub/sub. This already raises the first question: does the gatherer already provide an extracted word list or the entire article? Our streaming application then subscribes to the corresponding pub/sub topic to receive the word lists or articles. The next question is: what time period should be covered? The 10 most frequently used terms in which time period? Once the hit list has been created, it has to be stored somehow. This leads to the next question: what does the streaming application do with the result? Does it store it somewhere in the database or does it make the result available in a pub/sub topic? Finally, the current list of results has to be sent to the website. This raises the question of how the update should be carried out. Is there a webhock and the list of results is pushed to the website, or does the website send regular pull requests to a backend to request the current list of results? Questions upon questions and enough architecture material to write further articles about. At this point, we will concentrate on the actual streaming application and its challenges. We assume that the gatherer posts entire articles in Pub/Sub and that the hit list should display the last 30 seconds. Our rough process could then look as follows:

  1. Pub/Sub message arrives
  2. Split the article into individual words
  3. Calculate the word frequencies of the article/post
  4. Update the current word frequencies across all articles/posts
  5. After a time frame of 30 seconds, set the current word frequency of the top 10 in Pub/Sub (we are using this solution)

We are keeping it simple here. The second point could be further developed with the question of whether the words should be taken over 1:1 or whether it would not make more sense to normalize the words. So, for example, to use a stemmer and additionally to write all letters in lower case. This normalization could be divided into further individual steps. We deliberately did not address the topic of the language of the articles/postings, as how to deal with it would certainly also need to be clarified. We now have a rough shared picture of how our streaming application works, so we can now turn to the specific challenges.

With the general word frequency and the time window (30 seconds), we have already unconsciously made an important decision for the streaming application. Namely, the question of how we define the window. Let's look at the following example:

Fig. 1: Different types of information on the time axis

We have a stream with three types of information: A, B and C. For processing, we have two basic options: 1) Each element individually. 2) Elements in groups. For our use case with word frequencies, option 1 is out of the question; that would be pointless, because then we would not have the frequencies of all articles, but only the frequencies of one article at a time. In the basic option 2, however, there are several possibilities. We can fix the window after a certain time. For example, our 30 seconds. But we can also define the window flexibly by type. A window would always be as long as the same type comes. So a window for A, B and C. The following figure shows this:

Fig. 2: Window by time (top) and by type (bottom) – of course there are other ways of separating the data, for example by number, but we won't go into that here


In the first case, we have a fixed window by time. That means that in the first window we have [A, A], in the second [B], in the third [C, A], in the fourth [B], in the fifth [B] and in the sixth [C]. In the second case, we have dynamic windows by type. That means in the first window [A, A], in the second [B], in the third [C], in the fourth [A], in the fifth [B, B] and in the sixth [C].

When we process information in information baskets in a streaming application, we have to define when such a basket ends. As we know, the stream is (theoretically) infinite. So we need criteria for when we have such a basket together in order to be able to process it as a whole. Which type of “separation” is best depends on the specific application. In our case, a fixed time window seems to be best. We want to treat all articles/posts equally and only make a statement about the current top 10 words. This brings us to another important distinguishing criterion for streaming applications and the challenge of how to deal with them: what time do we take? Remember: we have various gatherers that collect articles and posts from different sources. The timeline above shows when this information flows into our streaming system. We need to be aware that not all articles/posts that enter our system in a certain order are published in that order. This can depend on various factors: the gatherers work at different speeds. The media do not all publish the articles at the same speed, or it takes different amounts of time to receive the articles. Finally, the gatherers feed this information into Pub/Sub. Pub/Sub does not guarantee any particular order. So there is no guarantee that an article/post that was delivered to Pub/Sub first will actually be delivered to the subscriber before an article/post that was delivered to Pub/Sub later. The following figure shows this schematically:


Fig. 3: Arrival vs. publication time

When we think in terms of “streams”, we have to distinguish between the time at which an event occurred, in our case the time of publication, and the time of processing. When the event arrives in the streaming system for processing. In the figure above, the seconds have been omitted for better readability and the differences are shown in minutes. If we now define a time window of 2 minutes, the baskets for word frequencies look like this

Fig. 4: 3 baskets by publication time


The big question now is: How does the system know that after the end of the first basket, i.e. when “B” with publication time 10:32 a.m. has arrived, further articles/posts with a publication time between 10:30 a.m. and 10:31 a.m. will arrive, so the basket cannot yet be closed? Please remember the term “watermark” at this point. We will come back to this later.

Another challenge that we can see is the necessary parallelism. We will simply assume here that we know that the first basket will not be closed after “B” arrives at 10:32. We consider it open for the following “C” at 10:30. However, we have to process “B” that arrived at 10:32. This means that at this point in time we have to process two baskets simultaneously.

So far, we have implicitly limited ourselves to word frequency when answering the question of how (simplified as one step). Of course, we can perform very different types of processing on the elements of a defined basket and also combine them. For example, we can filter them, combine them in pairs or group them. The following figure shows these possibilities:

Fig. 5: Various processing options


So we define the window that determines the contents of the shopping cart, the what, and how we process such a cart, the how. This is important because, depending on the algorithm, parallelization of processing in this step is possible or better possible or not.

In this article, we will limit ourselves to these questions and challenges of streaming systems (you could fill books with them) and summarize our findings so far: In an endless stream of events, we need to define when we can make a “cut” in order to process a part of the stream as a unit. We can define this “cut” in different ways. A temporal definition is particularly difficult if it is not the point in time of entry into the streaming system that is relevant, but the original point in time of the event outside the streaming system. This can lead to situations in which we have to keep several “cuts” open at the same time and assign the incoming events to the corresponding basket. There are different ways to process a basket. Depending on the requirements, we can filter elements (if we only need certain elements), group elements (if we need group-specific processing), or form pairs (if we can form key-value pairs, for example).

A bit of history

Before we look at how one or the other question and challenge was answered or solved in Apache Beam, it's time for a bit of history. Why? For two reasons: 1) It helps to understand where Apache Beam comes from and what it has to do with Apache Spark, Apache Flink and Google's Dataflow. 2) We all stand on the shoulders of giants. The fact that we can now implement robust streaming systems relatively easily and abstractly is thanks to many people who have been working on this and related topics for years. These are people in the background that hardly anyone knows. I don't think it would be appropriate to write about how easy it is to write great streaming applications without at least acknowledging them as a cohort.

MapReduce (2003): Google released the MapReduce framework, which revolutionized large-scale data processing. It provided a simple programming model based on two main functions, Map and Reduce, and enabled the distribution of tasks across many nodes in a cluster. Hadoop (2004): Apache Hadoop was developed by Doug Cutting (who also wrote Apache Lucene, by the way) and Mike Cafarella and is based on the principles of MapReduce. Hadoop has brought the idea of distributed storage systems and a distributed data processing platform to the open source world, enabling many companies to process large amounts of data efficiently. Flume (2007): Apache Flume was developed to collect, aggregate and move large amounts of streaming data in real time. It was developed primarily for collecting log data in distributed systems. MillWheel (2007): Google MillWheel is a framework that enables highly scalable and secure processing of streaming processes. MillWheel introduced concepts such as strict consistency, low latency and exactly-once processing. These concepts had a major influence on the later development of Google Dataflow.

Side note: So far, we have found that events can sometimes arrive late. We had not yet dealt with what happens if events arrive late and the corresponding basket has already been processed. Furthermore, with all the definitions and parallel processing, we have intuitively assumed that each event is logically processed only once. “C” at 10:30 a.m. lands in exactly one basket and is processed exactly once. What seems normal to us was not always the case and is not always trivial in detail. In MillWheel, the concept of “Exactly one matters” has found a fitting place and important theoretical foundations have been created that still apply and are used today.

Apache Spark (2009): Apache Spark was developed to address the limitations of MapReduce. It offers faster in-memory processing and supports both batch and streaming data processing. Spark's extensibility and ability to create complex data processing workflows quickly made it popular. Apache Flink (2010): Flink was developed to process data streams in real time. It provides a high-performance framework for low-latency, high-throughput streaming and batch data processing.

Apache Storm (2011): Storm enabled real-time data processing with low latency and high fault tolerance. Developed by Twitter, Storm was widely adopted in scenarios requiring rapid data processing. Google Dataflow (2014): Dataflow was another step in the evolution of data processing at Google, based on the experience with MapReduce and MillWheel. Dataflow offers a flexible and scalable model for batch and streaming processing and was introduced as a fully managed service on the Google Cloud Platform. We will come back to Dataflow in more detail. Apache Beam (2016): Apache Beam originated from Google Dataflow and brought the concept of a unified programming environment for batch and streaming data processing into the open source world. Apache Beam provides an SDK that is supported by various runtime environments, including Google Dataflow, Apache Spark, Apache Flink and others. It abstracts the underlying processing logic and allows developers to write their pipelines once and run them anywhere.

Furthermore, there are other important developments in the ecosystem of streaming applications. Here, Apache Kafka and Google Pub/Sub certainly deserve a mention. Both enable streaming applications to be supplied with data asynchronously from different sources.

As can be seen, it has been a rather long journey from initial concepts such as MapReduce to robust and relatively easy-to-implement streaming applications in terms of the generally rapid technological development in IT.

Abstraction with Apache Beam

Dataflow from Google had two strong components. Firstly, the runtime environment, generally referred to as the “Runner”. Secondly, an SDK. This SDK implemented a unified programming model that provided the greatest possible abstraction of the underlying processes. For example, the developer does not have to worry about how to robustly implement a time window. He simply defines the desired time window with the SDK. The executing component, the runner, takes care of the corresponding implementation. Google Dataflow had a local runner for development and testing purposes, i.e. a simplified runtime environment that a developer could use to quickly test the behavior of his application locally. In line with its philosophy, Google then decided to transfer the Dataflow SDK and local runner to the Apache Foundation as open source software. This was the birth of Apache Beam. The GCP product Dataflow is now a fully managed, robust and highly scalable execution platform for streaming applications developed with Apache Beam. After the SDK became open-source, other runtime environments for streaming applications also implemented Apache Beam-compatible “runners”. This means that the same application can be executed not only in Dataflow, but also in Apache Spark or Apache Flink, for example. I don't want to go into too much detail at this point. Just this much: there are differences in the implementation that can affect the behavior of the application. Before we take a look at the Apache Beam abstraction model, I need to clarify something. So far, we have always been talking about streaming applications and I explained at the beginning that there is a difference to classic batch processing. That is still true. However, if we understand how streaming applications work, we can also imagine that batch processing is simply a special variant of a streaming application. Just one that is clearer and more predictable. And that is exactly how Apache Beam was developed. It can be used to implement batch and streaming applications in the same way. In the Apache Beam environment, this is often referred to as “bounded” and “unbounded data”. Bounded data is batch processing and unbounded data is stream processing. This has the major advantage that you can use the same technology. In my experience, a product or company usually has more batch processes than real streaming applications. If you implement these in Apache Beam, you have enough know-how for the more rare real streaming applications.

The Apache Beam programming model basically consists of a few simple but important components/objects (not exhaustive):

Program flow

Pipeline:

The pipeline is the central object that describes the entire data processing. It contains all PCollections and PTransforms and thus defines the data processing flow, from the input of the data to the completion.

PCollection:

PCollection stands for “parallel collection”. The PCollection is a generic data structure in the Apache Beam programming model that represents a distributed, immutable collection of data representations. A PCollection can contain any number of elements of any data type. PCollections are passed as inputs to the transformation steps (PTransforms) and each PTransform in turn has at least one PCollection as a return value. Since these are immutable, a new PCollection object is created when changes occur during a transformation step.

PTransform:

PTransform stands for “parallel transformation” and is an abstraction that represents a data processing operation on one or more PCollections. PTransforms describe the processing logic and can range from simple operations such as filtering and mapping to more complex aggregations. PTransforms can be created in a number of ways: using pre-built standard transformations such as GroupByKey, Combine, etc. and using custom transformations. Each PTransform takes one or more PCollections as input and generates one or more PCollections as output. PTransform objects thus define how the data of the PCollection(s) is transformed.

Settings/configurations

Window:

Windows define time intervals or other criteria according to which elements in a PCollection are grouped. There are various standard windowing strategies: Fixed Windows, Sliding Windows or Session Windows. It is possible to implement your own strategies.

Fixed Windows:

The windows replace each other seamlessly, there is no overlap. When the first window ends, the next window begins. You simply define the time duration, e.g. 5 minutes.

Sliding Windows:

The windows usually overlap, depending on the configuration. You define the time duration and the interval. For example, 5 minutes and 1 minute. Every minute, a window opens that lasts 5 minutes. This means that an element that appears at 4 minutes will be included in windows 1, 2, 3 and 4. You may have already noticed that the concept of “Exactly one matters” has a different meaning here than you might have assumed when you first read the concept. Exactly one refers to the windows. When do you need this strategy? The strategy is particularly useful in scenarios where you want to calculate trends. For example, you might want to output a trend for each minute of the last five minutes.

Session Windows:

Session windows define a grouping and an inactivity duration. The window size is therefore variable. Session windows are useful if you want to group a logical unit of data. A typical example is a user interaction in a web application. You want to analyze the behavior but only capture actions as a unit if they occur in close temporal proximity. For example, if the user does not click for 5 minutes, the session closes and the next user actions are analyzed in a new session.

Watermark:

In my experience, it is important to use the correct translation for this term in German. Do not associate watermark with the German term “Wasserzeichen”, but with the terms “Wasserstand” or “Pegelstand”. The term has little to do with watermarks on banknotes or stationery. Watermark is a central component of the underlying concept of Apache Beam. Watermarks provide a balance between latency and completeness of data processing. In this context, a watermark is not a fixed level. Rather, it provides an estimate. An estimate of the time T at which all expected data will have arrived. Remember that in batch processing it is clear when all elements have arrived, but not in a stream. Take another look at Figure 4. How does the stream process know that a “C” at 10:30 will arrive after a “B” at 10:32? It's simple. It estimates, using a watermark, when – time T – all the elements of the first window will have arrived. It is sometimes, but not always, important to find a balance between a short latency, i.e. the need to deliver the results as quickly as possible and close the window as quickly as possible, and the need for completeness, i.e. to take into account as many of the associated elements as possible. It would be going too far to go into the calculation of watermarks in detail, and the advantage is that a developer usually does not have to deal with the implementation. In principle, it is a matter of heuristics that are calculated on the basis of various data from the source. Strictly speaking, Apache Beam does not calculate watermarks. The runner does that. Incidentally, this is one of the differences between different runners such as Apache Spark, Apache Flink and Dataflow. Not every runner calculates the watermarks the same for each source. Therefore, identical Apache Beam source code can produce different results. For example, the Google Dataflow team has tried to implement an optimized watermark calculation for the Pub/Sub source.

Trigger:

Triggers determine when the collected elements in a window are processed and the results emitted, i.e. when the window is closed. There are four standard triggers:

AfterWatermark:

Fired as soon as the point in time according to Watermark has been reached at which all events should have arrived. This is the most common trigger for time-controlled windows, especially when there should be a balance between latency and completeness.

AfterProcessingTime:

Fired based on the processing time after a certain delay. So the watermark is not used. The time is the time at which the events arrive. For example, a trigger of 5 minutes is fired 5 minutes after the arrival of the first event. This trigger is useful when processing time (latency) is weighted higher than completeness. Regardless of whether events are still arriving, after 5 minutes (in our example) the window is closed; events arriving later are handled in a new window.

AfterPaneElementCount:

This is triggered when a certain number of elements have been collected in a window. Time is therefore not a factor with this trigger. If the trigger is set to 10, it is triggered after 10 events have been received. This trigger is useful if you want to process a fixed number of events together.

Repeatedly:

This trigger repeats an underlying trigger and fires accordingly when the underlying trigger fires after a defined period (“delay”). Suppose we have an AfterProcessingTime trigger of 5 minutes. This fires 5 minutes after the first event occurs. With the Repeatedly trigger, we can now repeat this trigger every 5 minutes ("delay = 5), for example. So 5 minutes after the first firing, the trigger is triggered again. This trigger can be useful if you need regular updates, for example in real-time dashboards.


Timestamp:

Timestamps are assigned to the elements in a PCollection and represent the time at which the event occurred.

Program logic/data manipulation

GroupByKey:

GroupByKey groups elements of a PCollection by key and creates a new PCollection in which all values with the same key are grouped together.


Combine:

Combine is an aggregation on a PCollection. This can be used to perform standard aggregations such as sum or arithmetic mean, or custom aggregations. The result is a new PCollection.

Flatten:

Flatten combines multiple PCollections into a single PCollection. This can be used, for example, to merge multiple incoming PCollections into one and pass it as output to a next step, which then executes an aggregation function (Combine) on it, for example.

Partition:

Partition is the opposite of Flatten and divides a PCollection into several PCollections.


Filter:

As the name suggests, this PTransform can be used to filter individual elements from a PCollection.


Read/Write:

The name says it all. Read and Write are PTransforms that can be used to read data from a source or to write data to a target.

ParDo:

ParDo is a PTransform that applies user-defined processing to each element of a PCollection. As the name suggests (“Par” stands for parallel), it enables parallel processing of data. The user-defined processing is defined in a DoFn.

DoFn:

DoFn stands for “Do Function”. A DoFn is used to implement custom processing of an element of a PCollection. The DoFn is called by the ParDo, which applies the DoFn to each element of the PCollection.


External libraries:

There are numerous third-party libraries that implement Apache Beam interfaces and provide functions. Examples include Google's Pub/Sub or BigQuery reader, which make it easy to use Pub/Sub messages or BigQuery queries as event sources for an Apache Beam pipeline.


A picture is worth a thousand words. Let's take a look at the written theory of the model in a simplified graphical representation:


Fig. 6: Simplified representation of a pipeline with PCollections and PTransforms.

What is striking is that we have a pipeline and PCollection and PTransform. But where are Window, Watermark and Trigger? Or GroupyByKey, ParDo etc.? Window, Watermark (if used) and Trigger are at the beginning at/after “Create”. They define which specific items in the PCollection are to be used for the respective instance of the pipeline. The actual pipeline from “item” is executed in parallel, depending on the configuration and data. Let's look at this graphically as well:


Fig. 7: Window, Trigger and Watermark control the “start” of the pipeline instance


GroupByKey, Combine, Flatten, Filter etc. are standard PTransforms (green boxes) that can be easily used in the pipeline by configuration. In our case, for example, the words of the PCollection could be grouped in one step with GroupyByKey. ParDo with a DoFn could be used if you want to normalize the individual words (keywords stemmer, upper-/lowercase etc.).


There is one question we have so far skillfully ignored. What happens if “C” is late by 10:30 a.m. and, despite a good watermark, the window is already closed and the PC collection for this period has already been sent? In Apache Beam, there are basically two solutions for this problem. The first is the “allowedLateness” option as a window property. This can be used to define how long late events are accepted. These are then additionally placed in the next, actually incorrect window. With a setting of 2 minutes, events up to 2 minutes late are still processed. Later ones are ignored. The second option is often the better and correspondingly the more frequently used one: the list of “triggers” above is not complete. There is also a “late trigger”. This is triggered for all events that are late and you can define when or how often it should be executed (e.g. after time or number of such events). This allows you to handle these late events separately. The two options can also be combined.

Now we'll soon have everything we need to take a look at our first pipeline program. But before that, we need to take a brief look at the basics of the Apache Beam SDK. At this point, it should be noted that the introduction to Apache Beam in this article is only a rough overview and is intended as motivation to take a closer look at this framework. Within the scope of this article, we can only scratch the surface. When “we” is mentioned, it is mainly me as the author who is meant. Back to the SDK. What programming language do you actually program Apache Beam in? The Apache Project Apache Beam officially maintains SDKs for the following programming languages:

  • Java
  • Python
  • Go
  • TypeScript

In addition, Spotify offers an official SDK for the Scala programming language:

Furthermore, the Apache Beam project offers a YAML interface. This allows Apache Beam pipelines to be created purely declaratively, without programming. This is particularly suitable for simple standardized pipelines that get by with the standard transformers and do not need their own logic. Documentation for the Apache Beam SDKs can be found here:

The question remains as to which language is best for writing Apache Beam applications. In my opinion, it is best to use the language that you know best or that is most commonly used for similar purposes. I will write the following examples in Python because I believe that the Apache Beam source code is most intuitive to read in Python.


Let's look at a specific pipeline in Python. Here, we will focus directly on implementing the pipeline. We will leave out the body of a complete program. Python leaves a lot of leeway for how you want to design the “trappings” with configuration, functions, swapping to your own classes, etc. There are many complete code examples. The official ones can be found here:


Fig. 8: Code snippet of a simple Apache Beam pipeline in Python

Before we look at the code in sequence, here is an explanation of the two most important overwritten Python operators: | and >>. These allow a pipeline to be very readable in Python. The bitwise not exclusive OR operator (“|”) is used in the Apache Beam SDK for concatenation. The bitwise right shift operator (“>>”) is used to connect the name/title of a step with its implementation. A Java/Python comparison makes this easier to understand if necessary:

Java: p.apply(“Name/Title”, PTransform...)

Python: p | “Name/Title” >> PTransform...


Let's first get a rough idea of the code based on the familiar graphical representation:


Fig. 9: Mapping code to graphical representation

  1. bean.Pipeline p opens the pipeline definition (yellow area)
  2. ReadFromPubSub is the “Create”, the source of the data
  3. WindowInto the window function
  4. FlatMap as the first PTransformer of the actual business logic
  5. Map here as the second PTransfomer of the business logic

The pipeline in the code can be clearly defined from top to bottom in the flow. Let's take a closer look at the individual PTransformer functions:


Decoding articles:

This map function is used to encode the texts from Pub/Sub in utf-8. It is important to ensure a defined encoding because we then apply string operations (regex) to it. This is a purely technical step.

Aggregate in fixed time windows:

We use WindoInto to define a fixed time window of one minute. The parameter is seconds and is better understood as 1 * 60. In a real application, this should be implemented as a parameter and not hard-coded.

Splitting words:

Here we pass the FlatMap function an anonymous function that splits the texts into individual words. To do this, we use a regex function. All words without special characters are extracted. In this, or an additional, step, further normalizations could be carried out. Keywords are stemmer or upper-/lowercase.

Form pairs:

Here we form pairs with the map function in preparation for the subsequent counting of words (the word is the key).


Group by words and sum:

The CombinePerKey function aggregates the key (in our case the word) and the values are summed. This gives us the number of words.

Formatting the result:

This map function for formatting is only used for a nice display.


Encoding the result:

Pub/Sub, or rather the function we use, expects bytes, while we have strings for processing internally. We ensure utf-8 again and then return the result of type byte.


Publishing the result in Pub/Sub:

Finally, we publish the formatted result in Pub/Sub again.

So far, so good. You may be wondering where the trigger, the watermark or all the PCollections are in the code. Good questions. Trigger: If we use a FixedWindow, we don't need an explicit trigger. The trigger is implicit and is triggered after a defined period of time, in our case every 60 seconds. Watermark: Since we have a FixedWindow and the timestamp refers to the entry into the pipeline (we have not defined an external timestamp), we do not need a watermark. In our example, we could use a watermark as a trigger and also accept late data at the same time. This could be implemented as follows:


Fig. 10: Example with watermark as trigger and allowed lateness


In the example above, a GlobalWindows was used instead of a FixedWindows and AfterWatermark was used as the explicit trigger. The last accumulation mode used is for cleanup.


PCollections are passed implicitly. The function beam.Map(lambda x: (x, 1)) returns a PCollection. The Runner passes this to the following function beam.CombinePerKey(sum) for us. We do not need to program this transfer explicitly.


This brings us to the end of our introduction to streaming applications in general and the basics of Apache Beam as a unified programming model for developing streaming applications.


Efficient and scalable execution with Dataflow

An Apache Beam application must be executed by a runner. The Apache Beam SDK includes a local runner for this purpose, but this is not intended for productive use. A secure, scalable and managed runner is provided by Google with Dataflow. Deploying an Apache Beam application in Dataflow is extremely easy and not much more complicated than running it locally:


Fig. 11: Apache Beam deployment on Dataflow

This is an example for Python. Make sure that the Apache Beam SDK is installed. You can then execute the deployment directly.


It goes without saying that, depending on the application and landing zone, further preparatory work may be necessary. If you are using pub/sub, you may need to set up the corresponding topics. To do this, you will be asked to activate numerous GCP APIs if they are not already activated. You can find complete instructions for these steps here, for example:

Real-time transaction monitoring as a stream

At least in Switzerland, the Embargo Act and its ordinances, among other things, effectively oblige banks to check the senders and recipients of payments against “sanction lists”. Depending on the risk assessment, such checks are carried out against further personal and entity databases. These checks should be carried out before the actual execution of the payment. If the payment is executed first, this may already constitute a violation. From a technical point of view, this means that the transaction check must be carried out “in real time”. This requirement can be met by various architecture patterns. Each has its advantages and disadvantages, which need to be weighed and considered. I would like to briefly introduce three different architecture patterns and discuss their advantages and disadvantages.


Synchronous in one step:

In this model, the actual payment transaction function synchronously calls the name-matching function in its own process. It waits for the result and then continues processing.


Advantages:

This pattern has the advantage that the process logic is in one place and the function can be tested relatively easily. With unit tests against a mock service, the function can already be tested quite comprehensively.

Disadvantages:

The disadvantages of this pattern are mainly in the performance, and possibly in the costs and maintainability. Performance: The payment application has to wait for the response. This means that if there are a large number of payments, many more instances of the function have to be available. This requires more RAM and CPU, or there is a risk that the entire application will slow down. Costs: Depending on the implementation and operation of the name-matching function, higher costs may be incurred. This is especially true when the function is operated with a model such as Cloud Functions, where the number of calls has a direct influence on the costs. Maintainability: The payment function itself has to implement all error handling, re-try mechanisms, etc. This turns the application into a monolith that becomes more difficult to maintain over time and increases the complexity of changes.


Decoupling with pub/sub:

In this pattern, the process is decoupled. The payment application performs the previous checks such as credit checks, etc. At the point of name checking, it stores the status and publishes the payment in Pub/Sub. This triggers the name matching function, which performs the check, stores the status and, in the positive case, triggers the payment application via Pub/Sub to execute the payment.

Advantages:

With this pattern, we keep the payment application lean and easy to maintain. Performance remains virtually unchanged, and publishing in pub/sub requires few resources. During the name matching check, the payment application can use its resources for other payments.

Disadvantages:

The potential cost disadvantage of the first pattern remains, since each check is executed individually. Compared to the first pattern, this solution requires more testing. The test environment is more complex, the unit tests are limited to smaller units, and the informative value is correspondingly lower.

Transaction testing as a stream:

In this pattern, the transaction test is considered as a stream. Payments are continuously arriving and need to be checked. As in the previous pattern, the payment application publishes the payment in pub/sub. However, the consumer of this pub/sub topic is not a function, but a stream application. This application now processes the payments in defined windows. Ideally in fixed sizes, e.g. always 10 transactions together and a maximum time span. So after 10 transactions or after 30 seconds, whichever comes first. Depending on the capabilities of the payment application, the stream application triggers the payment application per payment or in batch via pub/sub for execution.

Advantages:

With this pattern, we can call the name matching function in batches. This means we can have 10 payments checked at once, for example. This reduces the network load and the number of function calls. The advantages of the previous pattern remain.


Disadvantages:

As with the previous solution, the testing effort for this variant is higher compared to the first solution, and the informative value of the individual unit tests is more limited.

We almost have the tools for a corresponding Apache Beam application. The only thing missing is the combined window function. We always want to combine 10 transactions. However, if there are not that many transactions, we don't want to wait long; after all, the transactions should be executed quickly. That's why we defined the second condition: a maximum of 30 seconds after the first transaction arrives in the new window. This means that in the worst case, the start of a check takes 30 seconds if no more than 8 further transactions arrive afterwards. For this we can use the known global window with its own triggers. Here we can combine various triggers with “AfterAny”, which are linked logically or (“OR”). Our window definition could look like the following:


Fig. 12: Combined window with Repeatedly and AfterAny

Let's move on to the name-matching part. In a payment, we usually do not have a structured name from the recipient. This means that we often have a complete address in a field. This may well include several people. For example:


Mr. and Mrs.
Hans and Erika Müller-Meier

Musterstrasse 24

8001 Zurich

Switzerland

We cannot rely on a well-structured complete address. It could also look like this:


Mr. Hans and Erika Müller-Meier

Muster Street 24, CH-8001 Zurich


This is where the ESC function “findByAddress” comes into play. We can pass a complete address as a string to this function to search for names in the index. We do not need to worry about the structure of the address.

For the batch check, we can extend our sample application from this Cloud Series article: Running an Apache ESC index application serverless with Cloud Run to include an endpoint that takes a list of strings (addresses) with associated transaction numbers as input. This checks all of them and returns the result:


Fig. 13: Code extension with a new Rest-API endpoint – checking a list of addresses




Fig. 14: Local query of the new Rest-API endpoint

With this practical example, I hereby conclude the brief introduction to streaming applications with Apache Beam and their execution in Dataflow. I hope you were able to take away some architectural inspiration.

Further links:

Keine Kommentare: