[Java] Multi-thread processing --Exclusive control

Be careful of data sharing in multi-thread processing

NG example

//NG example
public class SynchronizedNotUse {
//Data shared by multiple threads
  private int value = 0;

//Execution of 100,000 threads
  public static void main(String[] args) {
    final int TASK_NUM = 100000;
    var th = new Thread[TASK_NUM];
    var tb = new SynchronizedNotUse();
    //Thread creation and execution
    for (var i = 0; i < TASK_NUM; i++) {
      th[i] = new Thread(() -> {
        tb.increment();
      });
      th[i].start();
    }
    //Wait until the thread ends
    for (var i = 0; i < TASK_NUM; i++) {
      try {
        th[i].join();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(tb.value); //Different every time
  }
  //Increment the value field
  void increment() {
    this.value++;
  }
}

Exclusive control with synchronized block

public class SynchronizedUse {
  private int value = 0;

  public static void main(String[] args) {
    final int TASK_NUM = 100000;
    var th = new Thread[TASK_NUM];
    var tb = new SynchronizedUse();
    for (var i = 0; i < TASK_NUM; i++) {
      th[i] = new Thread(() -> {
        tb.increment();
      });
      th[i].start();
    }
    for (var i = 0; i < TASK_NUM; i++) {
      try {
        th[i].join();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(tb.value);
  }
   //Specify the object to be locked as the current instance
  void increment() {
    synchronized(this) {
      this.value++;
    }
  }
}

synchronized modifier

synchronized void increment() {
    this.value++;
}

Explicit lock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockBasic {
  private int value = 0;
  private final Lock lock = new ReentrantLock();

  public static void main(String[] args) {
    final int TASK_NUM = 100000;
    var th = new Thread[TASK_NUM];
    var tb = new LockBasic();
    for (var i = 0; i < TASK_NUM; i++) {
      th[i] = new Thread(() -> {
        tb.increment();
      });
      th[i].start();
    }
    for (var i = 0; i < TASK_NUM; i++) {
      try {
        th[i].join();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(tb.value);
  }


  void increment() {
    //Rock acquisition
    lock.lock();
    try {
      this.value++;
    } finally {
      //unlock
      lock.unlock();
    }
  }
}

** tryLock method **

  if (lock.tryLock(10,TimeUnit.SECONDS)) {
    try {
    //Processing to be exclusively controlled
    } finally {
      lock.unlock();
    }
  }else{
  //What to do if you can't get a lock
}

Lock-free exclusive control

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicBasic {
  private AtomicInteger value = new AtomicInteger();

  public static void main(String[] args) {
    final int TASK_NUM = 100000;
    var th = new Thread[TASK_NUM];
    var tb = new AtomicBasic();
    for (var i = 0; i < TASK_NUM; i++) {
      th[i] = new Thread(() -> {
        tb.increment();
      });
      th[i].start();
    }
    for (var i = 0; i < TASK_NUM; i++) {
      try {
        th[i].join();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    System.out.println(tb.value);
  }

  void increment() {
    value.getAndIncrement();
  }
}

Thread pool

public class ThreadPool implements Runnable {
  @Override
  public void run() {
    for (var i = 0; i < 30; i++){
      System.out.println(Thread.currentThread().getName() + ":" + i);
    }
  }
}
import java.util.concurrent.Executors;

public class ThreadPoolBasic {

  public static void main(String[] args) {
    //Create a thread pool with 10 threads
    var es = Executors.newFixedThreadPool(10);
    //Call and execute with execute method
    es.execute(new ThreadPool());
    es.execute(new ThreadPool());
    es.execute(new ThreadPool());
    es.shutdown();
  }
}

Schedule execution

import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadSchedule {
  public static void main(String[] args) {
  //Thread pool preparation
    var sche = Executors.newScheduledThreadPool(2);
  //Schedule execution registration
    sche.scheduleAtFixedRate(() -> {
      System.out.println(LocalDateTime.now());
    }, 0, 5, TimeUnit.SECONDS);

    //Pause the main thread after waiting for the schedule to execute
    try {
      Thread.sleep(10000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    //Shut down the thread pool
    sche.shutdown();
  }
}

Receive the processing result of the thread

//Find a random number in another thread and pause for a few milliseconds
//Display that value in the main thread
import java.util.Random;
import java.util.concurrent.Callable;

//Callable<Integer>Assign Integer type with
public class ThreadCallable implements Callable<Integer> {
  @Override
  //Code to execute in the Callable interface
  public Integer call() throws Exception {
    var rnd = new Random();
    var num = rnd.nextInt(1000);
    Thread.sleep(num);
    return num;
  }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

public class ThreadCallableBasic {

  public static void main(String[] args) {
  //Thread execution
    var exe = Executors.newSingleThreadExecutor();
    var r1 = exe.submit(new ThreadCallable());
    var r2 = exe.submit(new ThreadCallable());
    var r3 = exe.submit(new ThreadCallable());

  //Thread result display
    //The return value of the submit method is Future<Integer>
    try {
      System.out.println("r1: " + r1.get());
      System.out.println("r2: " + r2.get());
      System.out.println("r3: " + r3.get());
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    exe.shutdown();
  }
}

Post-processing of thread processing

//Find a random number in another thread and pause for a few milliseconds
//Show its value

import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class FutureBasic {

  public static void main(String[] args) {
    //Asynchronous processing execution
    CompletableFuture.supplyAsync(() -> {
      var r = new Random();
      var num = r.nextInt(1000);
      heavy(num);
      return num;
    })
      //Processing after completion
      .thenAcceptAsync((result) -> {
        System.out.println(result);
      });
    System.out.println("...Any post-processing...");
    heavy(7000);
  }
  //Dummy processing (heavy) suspends processing only for the specified time
  public static void heavy(int num) {
    try {
      Thread.sleep(num);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Sort success / error handling

import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class FutureComplete {

  public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
      var r = new Random();
      var num = r.nextInt(1000);
      heavy(num);
      return num;
    })
      .whenCompleteAsync((result, ex) -> {
        //success
        if (ex == null) {
          System.out.println(result);
        } else {
        //Failure
          System.out.println(ex.getMessage());
        }
      });
    heavy(7000);
  }

  public static void heavy(int num) {
    try {
      Thread.sleep(num);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Execute multiple asynchronous processes in series

import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class FutureSeq {

  public static void main(String[] args) {
  //Process 1(Random number generation)
    CompletableFuture.supplyAsync(() -> {
      var r = new Random();
      var num = r.nextInt(5000);
      heavy(2000);
      System.out.printf("Process 1: %d\n", num);
      return num;
    })
  //Process 2(Random number times)
    .thenApplyAsync((data) -> {
      var result = data * 2;
      heavy(2000);
      System.out.printf("Process 2: %d\n", result);
      return result;
    })
  //Process 3(Random number double)
    .thenAcceptAsync((data) -> {
      var num = data * 2;
      heavy(2000);
      System.out.printf("Process 3: %d\n", num);
    });
    heavy(7000);
  }

  public static void heavy(int num) {
    try {
      Thread.sleep(num);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Recommended Posts

[Java] Multi-thread processing --Exclusive control
[Java] Multi-thread processing
Java control syntax
Java control syntax
Java thread processing
Java string processing
[Java] Stream processing
java iterative processing
[Java] Control syntax notes
JAVA constructor call processing
Java random, various processing
Java version control on macOS
Version control Java with SDKMAN
Java version control with jenv
[Java] Stream API --Stream termination processing
[Java] Stream API --Stream intermediate processing
[Java] Timer processing implementation method
☾ Java / Iterative statement and iterative control statement
Measured parallel processing in Java
Understanding Java Concurrent Processing (Introduction)
[Java] Summary of control syntax
Summary of java error processing
[ev3 × Java] Display, sound, LED control
Date processing in Java (LocalDate: Initialization)
Delegate some Java processing to JavaScript
[Java] Loop processing and multiplication table
Run node.js from android java (processing)
[Processing × Java] How to use variables
Notes on signal control in Java
Server processing with Java (Introduction part.1)
Surprisingly deep Java list inversion-Stream processing
Basic processing flow of java Stream
About file copy processing in Java
Notes on Android (java) thread processing
Deleting files using recursive processing [Java]
[Processing × Java] How to use arrays
[Java] Processing time measurement method memo
[Java] Exception types and basic processing
This and that of exclusive control
[ev3 x Java] Single motor control