I just write and understand the gRPC server, but I don't know much about it and I can't move on. However, try it slowly in your own way.
The second topic is about the Future
and Callable
interfaces. Without understanding this, gRPC
cannot be understood. This area has a scent that seems to be similar to the C # Task and async / await mechanism, so as a theme
I would like to implement ** FanOut / FanIn ** in Java.
The interface is such that it simply returns a T-type return value.
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
Let's create an implementation class that implements this. This is the part that works in parallel. Since it is assumed that multiple threads will run in parallel, the id of CurrentThread is displayed. One thing I'm curious about here is, was it okay to use Thread.sleep ()
? I'm saying that. With C # async / await, this is illegal, because async calls don't always occupy Thread, so it's inconvenient to put Thread to sleep. I read Java Delay-4 Ways to Add Delay in Java, but I didn't get any comments, so I'll use it for now.
Activity
public class Activity implements Callable<String> {
public String call() throws Exception {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
return "Thread [" + currentThread.getId() + "] has done.";
}
}
Now, create a class that executes this. I want to process Activities in parallel, and when all are finished, I want to display the result and finish.
For Java, it seems to use something called ʻExecutorService. Multiple methods were defined in ʻExecutors
, and the ones I'm using now are defined to create a fixed number of thread pools, create a single thread pool, cache, and so on. In this case, it has 5 thread pools. So, unlike C #, it occupies the thread sometimes, so is it okay to use Thread.sleep ()
?
Future interface
Future is an image of the equivalent of Task
in C #. This object is returned as a return value when the thread is executed, and parallel work is started. Issuing the get
method blocks until the thread finishes processing.
public interface Future<V>
{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
The ʻexecutor.submit method creates this
Future. If you pass ʻActivity
earlier, the processing will be executed in parallel. Store the Future
object in the list. Then, wait at ʻexecutor.awaitTermination. The final result is obtained and displayed by the
get ()` method. However, there is a problem here.
public class FanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10 ; i++) {
Future<String> result = executor.submit(new Activity());
results.add(result);
}
executor.awaitTermination(30, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
System.out.println(results.get(i).get());
}
}
}
My original image was that all the processing was completed or the execution result was displayed with a timeout, but that is not the case. ʻAwaitTermination` keeps blocking even after all the processing is finished, and with this code, it will not proceed to the next block until 30 seconds have passed.
CompletableFuture
Therefore, there seems to be something called Completabled Future
. The actual interface implementation is long, so I'll omit it, but it's a class that implements the Future
and CompletionStage
interfaces. Of the Future
, it has a definite" completion ". This seems to be useful for a variety of purposes, but it also seems to be suitable for FanOut / FanIn
scenarios. Write the one above that works the way you want.
CompletableFuture
is implemented by Runnable
because Callable
cannot be used. It's a bit clunky, and I think it could be written better, but Runnable
can't return a return value. If I wanted to behave the same as the previous code (returning a String), it's a pain, but I implemented a method called getResult
. Yup. There must be a better way to write it. Please leave a comment.
public class RunnableActivity implements Runnable {
private String result;
@Override
public void run() {
Thread currentThread = Thread.currentThread();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + currentThread.getId() + "] Activity: " + i);
}
System.out.println("[" + currentThread.getId() +"] Done");
this.result = "Thread [" + currentThread.getId() + "] has done.";
}
public String getResult() {
return this.result;
}
}
I'm writing runAsync
myself, but I'm writing it myself to force the return value of Runnable back. Yup. There must be a better way w. It's similar to the previous one, but since I'm using CompletableFuture
, I'm using Runnable
, and with the CompletableFuture.allOf
method, this time it feels good and all ends. Will wait for you. This is the one I wanted.
When it's done, you can use the same loop as last time, but when all the threads are finished properly, it's written in Linq style, and the value is fetched as lambda and output.
public class CompletableFanOutFanIn {
public void execute() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(5);
// ExecutorService executor = Executors.newSingleThreadExecutor();
List<CompletableFuture<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<String> result = runAsync(new RunnableActivity(), executor);
results.add(result);
}
CompletableFuture<Void> cf = CompletableFuture.allOf(results.toArray(new CompletableFuture[10]));
cf.whenComplete((ret, ex) -> {
if(ex == null) {
String msg = results.stream().map(future -> {
try {
return future.get();
} catch (Exception iex) {
return iex.toString();
}
}).collect(Collectors.joining("\n"));
System.out.println("result = " + msg);
} else {
System.err.println(ex);
}
});
}
private CompletableFuture<String> runAsync(RunnableActivity runnable, ExecutorService executor) {
CompletableFuture<String> cf = new CompletableFuture<>();
executor.execute(() -> {
runnable.run();
cf.complete(runnable.getResult());
});
return cf;
}
}
Result
By the way, in C # Task, Thread.id is shared instead, but Java clearly seems to be one thread.
[23] Activity: 0
[22] Activity: 0
[21] Activity: 0
[20] Activity: 0
[24] Activity: 0
[20] Activity: 1
[23] Activity: 1
[22] Activity: 1
[24] Activity: 1
[21] Activity: 1
[20] Activity: 2
:
[20] Activity: 97
[24] Activity: 97
[22] Activity: 97
[20] Activity: 98
[23] Activity: 98
[21] Activity: 98
[24] Activity: 98
[22] Activity: 98
[22] Activity: 99
[22] Done
[24] Activity: 99
[24] Done
[23] Activity: 99
[23] Done
[21] Activity: 99
[21] Done
[20] Activity: 99
[20] Done
result = Thread [20] has done.
Thread [21] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
Thread [21] has done.
Thread [20] has done.
Thread [22] has done.
Thread [23] has done.
Thread [24] has done.
To be honest, I'm still not confident in concurrent programs after Java8. Take your time and learn hard. Around Pluralsight. Don't rush, step by step. First of all, I was able to implement what I originally thought was the first step, so I'm happy today.
Recommended Posts