Skip to main content

Java Multithreading 多线程

1. Explain volatile variables in java?

2. How to create a new thread ? (Please also consider Thread Pool case)

Thread creation by extending the Thread class

写自定义类继承自 Thread类,然后实现 run()方法,并通过 start()方法启动线程。

public class Main {
public static void main(String[] args) {
Thread t = new MyThread();
t.start(); // 启动新线程
}
}

class MyThread extends Thread {
@Override
public void run() {
System.out.println("start a new thread");
}
}

Thread creation by using Lambda

class Main {
public static void main(String[] args) {
Thread t = new Thread(() -> System.out.println("start a new thread"));
t.start();
}
}

Thread creation by implementing the Runnable Interface

实现 Runable 接口,并在 run() 方法中写具体实现。创建 Thread 实例时,传入一个 Runnable 实例

public class Main {
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());
t.start(); // 启动新线程
}
}

class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("start new thread!");
}
}

Thread creation by ThreadPool

// 3. 上述代码展示了如何创建一个自定义的线程池,它使用 ArrayBlockingQueue 作为阻塞队列,并指定了核心线程数、最大线程数和线程空闲时间。
// 我们还设置了拒绝策略为 CallerRunsPolicy,这意味着当队列已满且无法创建更多线程时,提交任务的线程本身将执行任务。
// 代码中提交了 10 个任务到线程池,并在完成所有任务后优雅地关闭线程池。

import java.util.concurrent.*;

public class Main {
public static void main(String[] args) {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;

BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
queue,
new ThreadPoolExecutor.CallerRunsPolicy());

// Submit tasks to the custom thread pool
for (int i = 0; i < 10; i++) {
int taskNumber = i;
executor.execute(() -> {
System.out.println("Task " + taskNumber + " is executed by " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

// Gracefully shut down the thread pool
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Thread pool did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

3. CompletableFuture 如何保证每个请求能成功?

**CompletableFuture是Java 8引入的一种并发编程工具,用于异步处理任务并处理任务的结果。它实现了Future接口并提供了一系列功能强大的方法,使得异步编程更加简单。然而,使用CompletableFuture**并不能直接保证每个请求都能成功。要确保请求成功,需要根据请求的特点来设计错误处理策略。

为了确保每个请求成功,可以采用以下策略:

  1. 异常处理:在异步任务中处理潜在的异常。使用**exceptionally()handle()方法来处理CompletableFuture**中的异常。这些方法允许您在发生异常时提供替代结果或处理异常。
CompletableFuture.supplyAsync(() -> {
// 你的请求代码
}).exceptionally(ex -> {
// 异常处理逻辑
return defaultValue;
});
  1. 重试机制:当请求失败时,尝试重新执行请求。可以通过使用递归或循环来实现重试机制。这种方法通常与异常处理相结合,以便在请求失败时执行重试。
public CompletableFuture<T> retryRequest(int retries) {
return CompletableFuture.supplyAsync(() -> {
// 你的请求代码
}).exceptionally(ex -> {
if (retries > 0) {
return retryRequest(retries - 1).join();
}
// 在重试次数用完后,可以抛出异常或返回默认值
throw new RuntimeException("请求失败,重试次数已用完", ex);
});
}
  1. 超时控制:为请求设置合理的超时时间,避免长时间等待。使用**orTimeout()** 或**completeOnTimeout()** 方法来设置超时。这可以确保请求在规定的时间内完成,否则抛出异常或提供默认结果。
CompletableFuture.supplyAsync(() -> {
// 你的请求代码
}).orTimeout(5, TimeUnit.SECONDS);

4. Let's assume you have configured 10 threads in your application. Now your service has already created 10 threads, there is a new request that has come in spawn a new thread. What happens after that? How will Java handle this new thread?

提示: 阻塞队列 BlockingQueue

假设您在应用程序中配置了10个线程,这意味着您可能使用了一个固定大小的线程池。当您的服务已经创建了10个线程,并且有一个新的请求来创建新线程时,Java会根据线程池和阻塞队列的配置来处理这个新线程。

通常,在这种情况下,新的请求任务会被放入线程池的阻塞队列中,等待空闲线程可用。阻塞队列是一种数据结构,它允许在队列满时阻塞插入操作,以及在队列为空时阻塞获取操作。这意味着,当所有线程都在工作时,新任务会等待队列中的空位。

在Java中,java.util.concurrent 包提供了线程池和阻塞队列实现。例如,您可以使用 Executors.newFixedThreadPool() 方法创建一个固定大小的线程池,它内部使用 LinkedBlockingQueue 作为默认的阻塞队列。

当线程池中的某个线程完成其任务并变为空闲时,它会从阻塞队列中获取等待中的任务并开始执行。如果阻塞队列已满,根据线程池的拒绝策略,可能会抛出异常、丢弃任务或运行其他处理逻辑。

当选择阻塞队列和拒绝策略时,您需要根据应用程序的需求和性能特征来决定。以下是一些示例场景,以及在这些场景中可能使用的阻塞队列和拒绝策略:

  1. 高吞吐量、计算密集型任务:在这种场景下,您可能希望使用一个固定大小的线程池,以避免过多的上下文切换和资源竞争。此外,您可以使用有界阻塞队列(如 ArrayBlockingQueue)来限制任务队列的大小,以防止内存溢出。在这种情况下,一个合适的拒绝策略可能是 ThreadPoolExecutor.CallerRunsPolicy,它让调用者线程执行被拒绝的任务,从而降低任务提交的速度。
  2. 可扩展的、I/O密集型任务:对于 I/O 密集型任务,您可以使用较大的线程池,因为线程在等待 I/O 时可能会被阻塞。这种情况下,可以使用 LinkedBlockingQueue 作为阻塞队列,因为它具有较好的性能特性且可以容纳更多任务。拒绝策略可以选择 ThreadPoolExecutor.CallerRunsPolicyThreadPoolExecutor.DiscardOldestPolicy,后者会丢弃队列中等待时间最长的任务,为新任务腾出空间。
  3. 低延迟、实时性要求高的任务:对于实时性要求高的任务,您可能希望尽快处理新请求。在这种情况下,可以选择一个动态调整大小的线程池(如 Executors.newCachedThreadPool())。对于阻塞队列,可以选择 SynchronousQueue,它不存储任务,而是直接将任务从生产者传递给消费者。拒绝策略可以选择 ThreadPoolExecutor.AbortPolicy,它会在队列满时抛出异常,通知调用者任务被拒绝。
  4. 具有优先级的任务处理:如果您的应用程序需要处理具有不同优先级的任务,可以使用 PriorityBlockingQueue 作为阻塞队列。这种队列根据任务的优先级对其进行排序,确保高优先级任务先被执行。拒绝策略可以根据上述场景选择合适的策略。

5. 在 Java中,Main方法 6秒,Method A, B, C 方法独自执行,让 Main 方法2秒执行?

提示 CompletableFuture 实现

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Example {
public static void main(String[] args) {
CompletableFuture<Void> futureA = CompletableFuture.runAsync(() -> methodA());
CompletableFuture<Void> futureB = CompletableFuture.runAsync(() -> methodB());
CompletableFuture<Void> futureC = CompletableFuture.runAsync(() -> methodC());

try {
CompletableFuture.allOf(futureA, futureB, futureC).get(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// handle InterruptedException
} catch (ExecutionException e) {
// handle ExecutionException
} catch (TimeoutException e) {
// handle TimeoutException
System.out.println("Main method timed out after 2 seconds.");
}
}

public static void methodA() {
// do something
}

public static void methodB() {
// do something
}

public static void methodC() {
// do something
}
}

Follow-up: 如果 C 依赖于 B 执行, B 依赖于 A 执行

如果方法C依赖于方法B的执行结果,而方法B又依赖于方法A的执行结果,可以使用**CompletableFuturethenApply()**方法来实现依赖关系的链式调用。

具体来说,可以先创建一个**CompletableFuture对象,表示方法A的执行结果。然后,调用thenApply()方法,将方法B作为一个Lambda表达式传递给该方法,这样当方法A执行完成后,方法B就会自动启动执行。最后,再次调用thenApply()**方法,将方法C作为一个Lambda表达式传递给该方法,这样当方法B执行完成后,方法C就会自动启动执行。

import java.util.concurrent.CompletableFuture;

public class Main {
public static void main(String[] args) {
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> methodA());
CompletableFuture<Void> futureB = futureA.thenAcceptAsync(resultA -> methodB(resultA));
futureB.join();
}

public static String methodA() {
// do something and return a String
System.out.println("This is methodA");
return "return result A";
}

public static void methodB(String resultA) {
// do something with resultA
System.out.println(resultA);
System.out.println("This is methodB");

}
}

// output
// This is methodA
// return result A
// This is methodB

6. When using a thread pool in your project, how to set the number of core threads?

CPU Core Count

  • General Guideline: Do not exceed the number of physical or virtual CPU cores with the core thread count to avoid unnecessary context switching and CPU resource contention. 核心线程数不要超过服务器机器的CPU核心数,以避免不必要的上下文切换和CPU资源的竞争.
  • When using containers for services, check how many vCPUs are given, and set your thread numbers based on that to ensure smooth operations. 当使用容器时,容器所分配的 vCPU 来进行线程数量的计算

Example Scenario

  • System: 4 CPU cores.
  • Task Type: I/O-intensive.
  • Common Practice: Set the core thread count to 2x or 3x the number of CPU cores (i.e., 8 or 12 in this example) to ensure CPU is utilized effectively while waiting for I/O operations to complete.

Task Types

  • CPU-Intensive Tasks: If tasks are primarily CPU-bound, typically set the core thread count equal to the available CPU cores (or slightly more) to maximize CPU utilization.
  • I/O-Intensive Tasks: If tasks are primarily I/O-bound (e.g., file read/write, network I/O, etc.), you might want to increase the core thread count since threads won’t occupy the CPU while waiting for I/O operations to complete.

7. How to use CompletableFuture to write a simple demo?

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureDemo {

public static void main(String[] args) {

// 定义第一个API请求的CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
// 模拟网络请求
Thread.sleep(3000);
return "Hello";
} catch (InterruptedException e) {
e.printStackTrace();
return "";
}
});

// 定义第二个API请求的CompletableFuture
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
// 模拟网络请求
Thread.sleep(2000);
return " World";
} catch (InterruptedException e) {
e.printStackTrace();
return "";
}
});

// 将两个CompletableFuture组合起来
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

// 等待异步操作完成并输出结果
try {
String result = combinedFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}