[JAVA] Let's get started with parallel programming

This article is the 12th day of Java Advent Calendar 2016.

The day before, it was leak4mk0's Easy database-Object-oriented database (JPA) even in Java SE. Next is tkxlab's Object-Oriented Database (JPA) in Java SE.

Introduction

It's been a few years since we entered the multi-core era, which is said to be the end of free lunch. The reason is that I often make web applications, but this year the wave of parallel programming has come to me, who was developing with almost single threads (although there are cases where I am aware of multithreading such as serve red). .. That's why this year's Advent Calendar summarizes an introduction to parallel programming.

Java 8 standard parallel API

Java has been touted for multithreaded programming from the beginning, and it is a language that makes parallel programming easy. It has been steadily expanded with the version upgrade, and currently has the following APIs.

Let's see how to use it one by one.

Thread class

"Forget about me"

That's half a joke, but in most cases you shouldn't use the plain Thread class directly.

I also started by using plain Thread based on old knowledge, and when I google with multithreading, it still hits the top, It's much easier to use Future and Parallel Stream, and what you can do with Thread is usually smarter with Executor.

Executor framework

A simple parallel processing API based on a thread pool. The process is broken down into particles called "Tasks", assigned to the thread pool, and executed. Since Task is expressed by a class or lambda expression that inherits the Runnable interface, it can be used effectively as "upward compatibility of Thread that supports thread pool".

Also, usually, we use an Executor (name is confusing!) Called Executor Service that can handle Future.

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newFixedThreadPool(2);
    try {
        es.execute(() -> System.out.println("executor:1, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:2, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:3, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:4, thread-id:" + Thread.currentThread().getId()));
        es.execute(() -> System.out.println("executor:5, thread-id:" + Thread.currentThread().getId()));
    } finally {
        es.shutdown();
        es.awaitTermination(1, TimeUnit.MINUTES);
    }
}

The execution result is as follows.

executor:1, thread-id:11
executor:2, thread-id:11
executor:3, thread-id:12
executor:4, thread-id:12
executor:5, thread-id:12

You can see that the threads specified in newFixedThreadPool are being reused even though execute is executed 5 times. Executors is a factory that creates ExecutorService and has a method to create a thread pool with the following strategy.

Method name Overview
newCachedThreadPool A thread pool that creates new threads as needed, but reuses previously built threads when available
newFixedThreadPool Thread pool that reuses a specified fixed number of threads
newWorkStealingPool A thread pool that holds the maximum number of CPU cores or the specified number of parallels. A queue of tasks is assigned to each thread, and when the queue becomes free, tasks are intercepted from other threads.(Work Stealing)To process

newWorkStealingPool is a new method that appeared in Java 8. It will allocate threads efficiently, so if there is no problem, it will be adopted.

Fork/Join

Perform parallel programming using Fork and Join like Unix process management.

An API designed to perform CPU-heavy processing such as recursive divide-and-conquer. Because it uses ForkJoinPool, which uses the Work Stealing algorithm, it is designed to efficiently execute tasks with smaller granularity than the Executor.

However, unlike when Java 7 appeared, Java 8 also has a new WorkStealing Pool in the Executor, so I feel that the execution efficiency will not change much. (※ unconfirmed)

Since I am good at speeding up recursion, I will make a sample to find the familiar Fibonacci sequence.

First of all, a Fibonacci sequence made by ordinary recursion for comparison.

public static int fib(int n) {
    if (n == 0) {
        return 0;
    } else if (n == 1) {
        return 1;
    } else {
        return fib(n - 1) + fib(n - 2);
    }
}

public static void main(String[] args) {
    System.out.println(fib(45));
}

If this is implemented with ForkJoin, it will be as follows.

static class FibonacciTask extends RecursiveTask<Integer> {

    private final int n;

    FibonacciTask(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n == 0) {
            return 0;
        } else if (n == 1) {
            return 1;
        } else {
            FibonacciTask f1 = new FibonacciTask(n - 1);
            FibonacciTask f2 = new FibonacciTask(n - 2);

            f2.fork();
            return f1.compute() + f2.join();
        }
    }
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    System.out.println(pool.invoke(new FibonacciTask(45)));
}

I am writing a process by creating a FibonacciTask that inherits the RecursiveTask.

Originally, the acquisition of the values of f1: fib (n-1) and f2: fib (f-2), which were originally obtained by using recursion, is realized by f1 using recursion and f2 using fork / join. Is the point. Every time you fork, a new asynchronous task is created, so the degree of parallelism increases. Since f1 is a recursive calculation, processing is executed steadily without waiting for the result of f2. As a result, a large number of asynchronous tasks can be performed and parallel computing can proceed.

The invoke method is a method that "returns the wait value synchronously until the processing is completed". It's also a wrapper method for Task.fork.join.

However, maybe I made a mistake in writing something or it was an overhead problem, and in the code I wrote this time, the simple code that runs in a single thread is almost twice as fast as fork / join. .. .. So, when you actually use it, you should measure whether it will be faster. It will depend on the use case.

CompletableFuture

CompletableFuture is an API for realizing Future / promise, which is one of the parallel programming design patterns introduced from Java8. My understanding is that it is a design pattern for treating asynchronous programming like synchronous programming, and in the case of Java 8, asynchronousness is guaranteed by threads.

I will write it like this.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService es = Executors.newWorkStealingPool();

    //Simple asynchronous execution
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
    System.out.println(future.get());

    //Synthesis is also possible
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
    CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
    System.out.println(f2.get());

    //Packing in List or batch processing
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        final int n = i;
        futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
    }
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
}

In this sample, ExecutorService is passed as an argument. By doing this, even if you control threads in the middle such as JavaEE, if you have an ExecutorService that supports it, you can use it without conflict. It seems that ForkJoinPool.commonPool () is used by default.

First, although it is a simple asynchronous execution, it is a form of executing with a lambda expression as an argument with supplyAsync or runAsync. By executing get, it waits for the end of Future and receives the return value.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "heavy process", es);
System.out.println(future.get());

The convenience of CompletableFuture is that you can handle the asynchronous process itself as a variable in the form of Future, so you can simply write a process that takes the result of a certain asynchronous process as an argument. For example, if you fetch a Web page, process it, and register it in the DB, you usually pass a callback function as an argument. This easily leads to callback hell.

In the case of CompletableFuture, you can use thenApply etc. to express the post-processing of such asynchronous processing without using a callback as shown below.

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "task1", es);
CompletableFuture<String> f2 = f1.thenApply((x) -> x + ":task2");
System.out.println(f2.get());

This kind of processing is called "synthesis", so if you're not used to it, you'll feel like "what is composition?", But I hope you'll understand that it's not that big of a deal.

Also, since CompletableFuture is just a value, it can be packed in a List, or it can be executed simultaneously using an array (variadic argument in terms of method) using allOf.

List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    final int n = i;
    futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + n), es));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));

There are many other functions of CompletableFuture, but I can't write them a little, so I'll omit them here.

ParallelStream

Speaking of parallelism introduced in Java8, many people think of this. ParallelStream for parallel collection operations.

Parallel collections have long been used in declarative, side-effect-free languages such as functional languages, but they were finally introduced in Java in Java 8. This is a very different taste than before, and it is a method that automatically manages concurrency on the system side instead of directly dealing with concurrency and asynchrony.

It limits what you can achieve, unlike explicit parallel operations, but it makes your code much simpler. It can also cover many other use cases for high speed requirements due to parallelization.

Let's look at the code first.

public static void main(String[] args){
    IntStream.range(1, 10).boxed()
            .parallel()
            .map(x -> "task" + x)
            .forEach(System.out::println);
}

It's almost indistinguishable from a normal Stream, right? The difference is that .parallel is added. Parallel processing can be assembled with just this, and since it is a Stream, it is natural to use the return value of the previous processing in the subsequent processing, so a story like "Future synthesis" is simple.

It's a very powerful and concise API, but there are some caveats. Remember that it's parallel programming.

Please note that if you use a variable or an external resource other than an environment that supports parallelism such as exclusion, it will be easily broken.

Non-standard library

In addition to the Java standard library, there are libraries that support parallelism. Most of the time I haven't tried it yet, but I'll just mention it.

Apache Spark

http://spark.apache.org/

A distributed processing platform for executing large-scale batches. As for the programming model, parallel collection is adopted as in Parallel Stream.

I have used this differently from other libraries, so please refer to the article written below.

-[Apache Spark introductory memo] Part 1-Development environment construction -Read binary fixed-length file with Apache Spark -Apache Spark as a distributed batch framework

Akka

http://doc.akka.io/docs/akka/snapshot/intro/getting-started.html

Library / middleware to realize the actor model. It's from Scala, but it can also be used in Java. Although it is a parallel programming library for handling actors that are lighter than threads, it is a recognition that is more commonly used as a distributed system infrastructure.

JCSP

https://www.cs.kent.ac.uk/projects/ofa/jcsp/

It seems to be a library for doing process algebra in Java. Process algebra is a method of dealing with parallel computing mathematically, so I would like to try it next time.

Summary

Well, it's easy, but I've summarized parallel programming in Java. I think that many people have a strong image of plain multithreading, but Java has changed a lot.

Basically, Parallel Stream, and in difficult cases, Completable Future is likely to become the mainstream in the future. Both do not necessarily use common variables for passing values between threads, so I think it's easier to write safe code. I didn't introduce it this time, but there are cases where you can use non-blocking collections even if you absolutely need common variables, so I think that you will use them as needed.

Recently, beyond parallel programming on a single machine, large-scale parallelism using distributed systems has become familiar thanks to the cloud. I want to study hard around here as well.

Then next year will be Happy Hacking!

reference

Recommended Posts

Let's get started with parallel programming
Get started with Gradle
Let's get started with Java-Create a development environment ②
Let's get started with Java-Create a development environment ①
Maybe it works! Let's get started with Docker!
Get started with Spring boot
Get started with DynamoDB with docker
Get started with "Introduction to Practical Rust Programming" (Day 3)
How to get started with slim
I tried to get started with WebAssembly
[Note] How to get started with Rspec
Get started with "Introduction to Practical Rust Programming" (Day 4) Call Rust from Ruby
How to get started with Eclipse Micro Profile
Rails beginners tried to get started with RSpec
I started programming diary
Experienced Java users get started with Android application development
[My memo] Let's get along with Pry / DB with Rails
Getting Started with DBUnit
I tried to get started with Spring Data JPA
Getting Started with Ruby
Getting Started with Swift
Let's scrape with Java! !!
Getting Started with Docker
Getting Started with Doma-Transactions
How to get started with creating a Rails app
Get started with serverless Java with the lightweight framework Micronaut!
First year Java developers on udemy get started with PHP
How to get started with Gatsby (TypeScript) x Netlify x Docker
Now is the time to get started with the Stream API
How to get started with JDBC using PostgresSQL on MacOS
I tried to get started with Swagger using Spring Boot
Getting Started with Doma-Annotation Processing
Getting Started with Java Collection
Let's experiment with Java inlining
Amazing Java programming (let's stop)
[Form_with] Let's unify form with form_with.
Let's operate Excel with Java! !!
Getting Started with JSP & Servlet
Getting Started with Java Basics
Getting Started with Spring Boot
Getting Started with Ruby Modules
Easy code review to get started with Jenkins / SonarQube: Static analysis