Update your Java knowledge by writing a gRPC server in Java (2)

I just write and understand the gRPC server, but I don't know much about it and I can't move on. However, try it slowly in your own way. The second topic is about the Future and Callable interfaces. Without understanding this, gRPC cannot be understood. This area has a scent that seems to be similar to the C # Task and async / await mechanism, so as a theme

I would like to implement ** FanOut / FanIn ** in Java.

Callable interface

The interface is such that it simply returns a T-type return value.

@FunctionalInterface
public interface Callable<V> {
/**
 * Computes a result, or throws an exception if unable to do so.
 *
 * @return computed result
 * @throws Exception if unable to compute a result
 */
 
V call() throws Exception;
}

Let's create an implementation class that implements this. This is the part that works in parallel. Since it is assumed that multiple threads will run in parallel, the id of CurrentThread is displayed. One thing I'm curious about here is, was it okay to use Thread.sleep ()? I'm saying that. With C # async / await, this is illegal, because async calls don't always occupy Thread, so it's inconvenient to put Thread to sleep. I read Java Delay-4 Ways to Add Delay in Java, but I didn't get any comments, so I'll use it for now.

Activity

public class Activity implements Callable<String> {
    public String call() throws Exception {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            Thread.sleep(100);
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        return "Thread [" + currentThread.getId() + "] has done.";
    }
}

Now, create a class that executes this. I want to process Activities in parallel, and when all are finished, I want to display the result and finish. For Java, it seems to use something called ʻExecutorService. Multiple methods were defined in ʻExecutors, and the ones I'm using now are defined to create a fixed number of thread pools, create a single thread pool, cache, and so on. In this case, it has 5 thread pools. So, unlike C #, it occupies the thread sometimes, so is it okay to use Thread.sleep ()?

Future interface Future is an image of the equivalent of Task in C #. This object is returned as a return value when the thread is executed, and parallel work is started. Issuing the get method blocks until the thread finishes processing.

public interface Future<V> 
{
    boolean cancel(boolean mayInterruptIfRunning);
 
    boolean isCancelled();
 
    boolean isDone();
 
    V get() throws InterruptedException, ExecutionException;
 
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

The ʻexecutor.submit method creates this Future. If you pass ʻActivity earlier, the processing will be executed in parallel. Store the Future object in the list. Then, wait at ʻexecutor.awaitTermination. The final result is obtained and displayed by the get ()` method. However, there is a problem here.

public class FanOutFanIn {
    public void execute() throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Future<String>> results = new ArrayList<>();

        for (int i = 0; i < 10 ; i++) {
            Future<String> result = executor.submit(new Activity());
            results.add(result);
        }

        executor.awaitTermination(30, TimeUnit.SECONDS);

        for (int i = 0; i < results.size(); i++) {
            System.out.println(results.get(i).get());
        }

    }
}

My original image was that all the processing was completed or the execution result was displayed with a timeout, but that is not the case. ʻAwaitTermination` keeps blocking even after all the processing is finished, and with this code, it will not proceed to the next block until 30 seconds have passed.

CompletableFuture

Therefore, there seems to be something called Completabled Future. The actual interface implementation is long, so I'll omit it, but it's a class that implements the Future and CompletionStage interfaces. Of the Future, it has a definite" completion ". This seems to be useful for a variety of purposes, but it also seems to be suitable for FanOut / FanIn scenarios. Write the one above that works the way you want.

CompletableFuture is implemented by Runnable because Callable cannot be used. It's a bit clunky, and I think it could be written better, but Runnable can't return a return value. If I wanted to behave the same as the previous code (returning a String), it's a pain, but I implemented a method called getResult. Yup. There must be a better way to write it. Please leave a comment.

public class RunnableActivity implements Runnable {
    private String result;
    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[" + currentThread.getId() + "] Activity: " + i);
        }
        System.out.println("[" + currentThread.getId() +"] Done");
        this.result =  "Thread [" + currentThread.getId() + "] has done.";
    }

    public String getResult() {
        return this.result;
    }
}

I'm writing runAsync myself, but I'm writing it myself to force the return value of Runnable back. Yup. There must be a better way w. It's similar to the previous one, but since I'm using CompletableFuture, I'm using Runnable, and with the CompletableFuture.allOf method, this time it feels good and all ends. Will wait for you. This is the one I wanted.

When it's done, you can use the same loop as last time, but when all the threads are finished properly, it's written in Linq style, and the value is fetched as lambda and output.

public class CompletableFanOutFanIn {

        public void execute() throws InterruptedException, ExecutionException {
           ExecutorService executor = Executors.newFixedThreadPool(5);
           // ExecutorService executor = Executors.newSingleThreadExecutor();
            List<CompletableFuture<String>> results = new ArrayList<>();

            for (int i = 0; i < 10; i++) {
                CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
                results.add(result);
            }
            CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));

            cf.whenComplete((ret, ex) -> {
               if(ex == null) {
                   String msg = results.stream().map(future -> {
                       try {
                           return future.get();
                       } catch (Exception iex) {
                           return iex.toString();
                       }
                   }).collect(Collectors.joining("\n"));
                   System.out.println("result = " + msg);
               } else {
                   System.err.println(ex);
               }
            });
        }

        private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
            CompletableFuture<String> cf = new CompletableFuture<>();
            executor.execute(() -> {
                runnable.run();
                cf.complete(runnable.getResult());
            });
            return cf;
        }
}

Result

By the way, in C # Task, Thread.id is shared instead, but Java clearly seems to be one thread.

[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
     :
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.

Summary

To be honest, I'm still not confident in concurrent programs after Java8. Take your time and learn hard. Around Pluralsight. Don't rush, step by step. First of all, I was able to implement what I originally thought was the first step, so I'm happy today.

Recommended Posts

Update your Java knowledge by writing a gRPC server in Java (2)
Update your Java knowledge by writing a gRPC server in Java (1)
What I learned when building a server in Java
Implementing a large-scale GraphQL server in Java with Netflix DGS
Find a subset in Java
Make your own simple server in Java and understand HTTP
Think of a Java update strategy
Try implementing GraphQL server in Java
3 Implement a simple interpreter in Java
I created a PDF in Java.
A person writing C ++ tried writing Java
Handle your own annotations in Java
Implement a gRPC client in Ruby
A simple sample callback in Java
Get stuck in a Java primer
About returning a reference in a Java Getter
[Java] Divide a character string by a specified character
What is a class in Java language (3 /?)
When seeking multiple in a Java array
Duplicate Map sorted by key in Java
Differences in writing in Ruby, PHP, Java, JS
Key points for introducing gRPC in Java
Understand java interface in your own way
[Creating] A memorandum about coding in Java
Java creates a table in a Word document
Java creates a pie chart in Excel
What is a class in Java language (1 /?)
What is a class in Java language (2 /?)
Basic knowledge of Java development Note writing
Create a TODO app in Java 7 Create Header
Try making a calculator app in Java
Continued Talk about writing Java in Emacs @ 2018
Get history from Zabbix server in Java
Implement something like a stack in Java
Split a string with ". (Dot)" in Java
Creating a matrix class in Java Part 1
The story of writing Java in Emacs
Reading and writing gzip files in Java
Create a Docker container for your development web server in Ansible on MacOS
Let's create a versatile file storage (?) Operation library by abstracting file storage / acquisition in Java
Organize your own differences in writing comfort between Java lambda expressions and Kotlin lambda expressions.