Introduction to Apache Beam with Java

Introduction to Apache Beam with Java

main socket

  • Apache Beam is a powerful open source project for batch processing and streaming
  • Portability allows pipelines to run on different backends from Apache Spark to Google Cloud Dataflow
  • Beam is extensible, which means you can write and share new SDKs, IO connectors, and adapters
  • Beam currently supports Python, Java, and Go
  • With the Java SDK, you can take advantage of all the advantages of the JVM

In this article, we will introduce Apache Beam, a powerful open source batch processing and streaming project used by major companies like eBay to integrate their streaming pipelines and by Mozilla to securely transfer data between their systems.

Summary

Apache Beam is a programming model for data processing, batch and streaming support.

Using the SDKs available for Java, Python, and Go, you can develop pipelines and then choose the backend that will manage the pipeline.

Advantages of Apache Beam

The Ray Model (Francis Perry and Tyler Akidow)

  • Integrated I/O connectors
    • Apache Beam connectors allow easy extraction and loading of data from many types of storage
    • The main types of connectors are:
      • File based (eg: Apache Parquet, Apache Thrift)
      • File system (eg: Hadoop, Google Cloud Storage, Amazon S3)
      • Messaging (eg: Apache Kafka, Google Pub/Sub, Amazon SQS)
      • Database (eg: Apache Cassandra, Elastic Search, MongoDb)
    • As an OSS project, support for new connectors is growing (eg: InfluxDB, Neo4J)
  • Mobility:
    • Beam provides multiple runners to run the pipelines, allowing you to choose the best for each use case and avoid vendor lockouts.
    • Distributed processing backends such as Apache Flink, Apache Spark or Google Cloud Dataflow can be used as runners.
  • Distributed parallel processing:
    • Each element in the data set is handled independently by default so that its handling can be improved by running in parallel.
    • Developers do not need to distribute the load manually between workers as Beam provides an abstraction for it.

ray model

The basic concepts in the Beam programming paradigm are:

  • PCollection: represents a set of data, for example: a set of numbers or words extracted from a text.
  • PTransform: a transformation function that receives and returns a PC array, ie: sum all numbers.
  • Pipeline: Manages the interactions between PT transformations and PCollections.
  • PipelineRunner: Defines where and how the pipeline will be executed.

Quick start

The basic pipeline process consists of 3 steps: reading, processing, and writing the result of the conversion. Each of these steps is defined programmatically using one of the Apache ray SDKs.

In this section, we will create pipelines using the Java SDK. You can choose between creating a local app (using Gradle or Maven) or you can use the Online Playground. The examples will use local runner as it will be easier to check the result with JUnit asserts.

Java local dependencies

  • beam-sdks-java-core: Contains all classes of the beam model.
  • beam-runners-direct-java: By default, the Apache Beam SDK will use the direct runner, which means the pipeline will run on your local machine.

multiply by 2

In this first example, the pipeline will receive an array of numbers and will set each element multiplied by 2.

The first step is to create the pipeline instance that will receive the input array and run the transform function. Since we are using JUnit to run Apache Beam, we can easily create a file TestPipeline As a test class attribute. If you prefer to run on the main application instead, you will need to set the pipeline configuration options,

@Rule
public final transient TestPipeline pipeline = TestPipeline.create();

Now we can create a PC array that will be used as an input in the pipeline. It will be an array created directly from memory but can be read from anywhere Apache Beam supports:

PCollection numbers =
            	  pipeline.apply(Create.of(1, 2, 3, 4, 5));

Then we apply a transformation function that will multiply each element of the dataset by two:

PCollection output = numbers.apply(
            	  MapElements.into(TypeDescriptors.integers())
                    	.via((Integer number) -> number * 2)
    	);

To check the results we can write a confirmation:

PAssert.that(output)
            	  .containsInAnyOrder(2, 4, 6, 8, 10);

Note that the results are not supposed to be sorted as input, because Apache Beam processes each element independently and in parallel.

The test is done at this point, and we run the pipeline by calling:

pipeline.run();

process reduction

A minification is a group of multiple input elements that results in a smaller group, usually containing a single element.

MapReduce (Francis Perry and Tyler Akedaou)

Now let’s extend the above example to summarize all the elements multiplied by two, which results in a MapReduce transformation.

Each PCollection transformation results in a new PCollection instance, which means we can perform a series of transformations using apply method. In this case, the sum operation will be used after each entry is multiplied by 2:

PCollection numbers =
            pipeline.apply(Create.of(1, 2, 3, 4, 5));

PCollection output = numbers
            .apply(
                   MapElements.into(TypeDescriptors.integers())
                    	 .via((Integer number) -> number * 2))
            .apply(Sum.integersGlobally());

PAssert.that(output)
             .containsInAnyOrder(30);

pipeline.run();

FlatMap . process

FlatMap is a process that first applies a map to each input element which usually returns a new collection, resulting in an array of collections. A flat process is then applied to combine all of the nested groups, resulting in a single group.

The following example is to transform arrays of strings into a unique array containing each word.

First, we declare the list of words that will be used as input to the pipeline:

final String[] WORDS_ARRAY = new String[] {
        	"hi bob", "hello alice", "hi sue"};

final List WORDS = Arrays.asList(WORDS_ARRAY);

Then we create the input PCollection using the above list:

PCollection input = pipeline.apply(Create.of(WORDS));

Now we implement a Flatmap transform, which will split the words in each nested array and combine the results into a single list:

PCollection output = input.apply(
      FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split(" ")))
);

PAssert.that(output)
      .containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");

pipeline.run();

group process

A common function of data processing is grouping or counting by a particular key. We will illustrate this by counting the number of occurrences of each word from the previous example.

After we get the flat chain assembly, we can hook up another PT conversion:

PCollection> output = input
            .apply(
                  FlatMapElements.into(TypeDescriptors.strings())
                    	.via((String line) -> Arrays.asList(line.split(" ")))
            )
            .apply(Count.perElement());

what causes:

PAssert.that(output)
.containsInAnyOrder(
       KV.of("hi", 2L),
       KV.of("hello", 1L),
       KV.of("alice", 1L),
       KV.of("sue", 1L),
       KV.of("bob", 1L));

Reading from a file

One of the principles of Apache Beam is to read data from anywhere, so let’s see in action how to use a text file as a data source.

The following example will read “Words.txt” content with the “Advanced Unified Programming Model” content. Then the conversion function will return a PC set containing each word of the text.

PCollection input =
      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

PCollection output = input.apply(
      FlatMapElements.into(TypeDescriptors.strings())
      .via((String line) -> Arrays.asList(line.split(" ")))
);

PAssert.that(output)
      .containsInAnyOrder("An", "advanced", "unified", "programming", "model");

pipeline.run();

Write output to a file

As shown in the previous input example, Apache Beam has several built-in output connectors. In the following example, we will count the number of each word contained in a text file “Words.txt” that contains only one sentence (“Advanced Unified Programming Model”) and the output will continue to be in a text file format.

PCollection input =
      pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

PCollection> output = input
      .apply(
            FlatMapElements.into(TypeDescriptors.strings())
            	   .via((String line) -> Arrays.asList(line.split(" ")))
            )
            .apply(Count.perElement());;

       PAssert.that(output)
             .containsInAnyOrder(
                 KV.of("An", 1L),
                 KV.of("advanced", 1L),
                 KV.of("unified", 1L),
                 KV.of("programming", 1L),
                 KV.of("model", 1L)
            );

    	output
             .apply(
                   MapElements.into(TypeDescriptors.strings())
                  	     .via((KV kv) -> kv.getKey() + " " + kv.getValue()))
             .apply(TextIO.write().to("./src/main/resources/wordscount"));

    	pipeline.run();

File writing is optimized for parallelism by default, which means that Beam will select the best number of fragments (files) for persistence of the result. The files will be located in the src/main/resources folder and will have the prefix ‘wordcount’, the part number and the total number of parts as specified in the last output conversion.

When running it on my laptop, four pieces were created:

Part One (file name: Wordscount-00001-of-00003):

An 1
advanced 1

Part Two (file name: Wordscount-00002-of-00003):

unified 1
model 1

Part Three (File Name: Wordscount-00003-of-00003):

programming 1

The last part was created but in the end it was empty, because all the words were already processed.

Apache beam extension

We can take advantage of beam scalability by writing a custom transform function. The custom adapter will improve the maintainability of the code and will also eliminate duplication.

Basically we need to create a subclass of PTransform, mentioning the type of input and output as Java Generics. Then we override the expansion method and inside its content we put a repeater logic that receives one string and returns the computer set containing each word.

public class WordsFileParser extends PTransform, PCollection> {

	   @Override
	   public PCollection expand(PCollection input) {
       return input
            	  .apply(FlatMapElements.into(TypeDescriptors.strings())
                  .via((String line) -> Arrays.asList(line.split(" ")))
            	  );
	   }   
}

The test scenario redesigned to use WordsFileParser is now:

public class FileIOTest {

	  @Rule
	  public final transient TestPipeline pipeline = TestPipeline.create();

	  @Test
	  public void testReadInputFromFile() {
    	    PCollection input =
            	            pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

    	    PCollection output = input.apply(
            	        new WordsFileParser()
    	    );

    	    PAssert.that(output)
            	        .containsInAnyOrder("An", "advanced", "unified", "programming", "model");

    	    pipeline.run();
	  }

	  @Test
	  public void testWriteOutputToFile() {
    	    PCollection input =
             pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));

    	    PCollection> output = input
            	        .apply(new WordsFileParser())
            	        .apply(Count.perElement());

    	    PAssert.that(output)
            	        .containsInAnyOrder(
                    	      KV.of("An", 1L),
                    	      KV.of("advanced", 1L),
                    	      KV.of("unified", 1L),
                    	      KV.of("programming", 1L),
                    	      KV.of("model", 1L)
            	        );

    	     output
            	        .apply(
                    	      MapElements.into(TypeDescriptors.strings())
                    	      .via((KV kv) -> kv.getKey() + " " + kv.getValue()))
            	        .apply(TextIO.write().to ("./src/main/resources/wordscount"));

       pipeline.run();
	}
}

The result is a clearer pipeline and more modular units.

sweating

Sweating the Apache Beam (Francis Perry & Tyler Akedao)

A common problem with stream processing is grouping incoming data by a certain period of time, especially when dealing with large amounts of data. In this case, analyzing the data collected for an hour or a day is more relevant than analyzing each element of the dataset.

In the following example, let’s say we are working for a fintech company and receive transaction events containing the amount and at the moment the transaction occurred and we want to refund the total amount transacted per day.

Beam provides a way to decorate each PCCollection with a timestamp. We can use this to create a PCCollection that represents 5 financial transactions:

  • The amounts 10 and 20 were transferred on 02-01-2022
  • Amounts transferred 30, 40 and 50 on 02-05-2022
PCollection transactions =
      pipeline.apply(
            Create.timestamped(
                  TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),
                  TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),
                  TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),
                  TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),
                  TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))
               )
       );

Next, we’ll apply two transform functions:

  1. Aggregate transactions using a one-day window
  2. Collect the sums in each group
PCollection output =
      Transactions
             .apply(Window.into(FixedWindows.of(Duration.standardDays(1))))
             .apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());

In the first window (2022-02-01) the total amount is expected to be 30 (10 + 20), while in the second window (2022-02-05) we should see 120 (30 + 40 + 50) in the total amount.

PAssert.that(output)
                   .inWindow(new IntervalWindow(
                    	 Instant.parse("2022-02-01T00:00:00+00:00"),
                    	 Instant.parse("2022-02-02T00:00:00+00:00")))
            	   .containsInAnyOrder(30);

PAssert.that(output)
            	   .inWindow(new IntervalWindow(
                    	  Instant.parse("2022-02-05T00:00:00+00:00"),
                    	  Instant.parse("2022-02-06T00:00:00+00:00")))
            	   .containsInAnyOrder(120);

Each IntervalWindow instance needs to match the start and end timestamps of the chosen duration, so the time chosen must be ’00:00:00′.

Summary

Apache Beam is a powerful battle-tested data framework, allowing for both streaming and streaming manipulation. We have used the Java SDK to create maps, reduce, aggregate, frames, and other operations.

Apache Beam can be a perfect fit for developers who are working with awkward parallel tasks to simplify data processing mechanics at scale.

Its connectors, SDKs, and support for multiple runners offer flexibility and by choosing a cloud-native runner like Google Cloud Dataflow, you get automated management of computational resources.



2022-06-06 12:06:09

Leave a Comment

Your email address will not be published.