多线程CompletableFuture 和 parallelStream使用

CompletableFuture 和 parallelStream 都是 Java 中用于并发执行任务的工具,属于 异步编程/并行计算 的范畴,但它们的使用场景、机制和控制能力各有不同。 ✅ 一、 Completabl…


CompletableFuture 和 parallelStream 都是 Java 中用于并发执行任务的工具,属于 异步编程/并行计算 的范畴,但它们的使用场景、机制和控制能力各有不同。

✅ 一、

CompletableFuture

使用及注意事项

🔹 1. 简介

CompletableFuture 是 Java 8 引入的用于实现异步编程的类,属于 java.util.concurrent 包,可以:

  • 异步执行任务
  • 组合多个异步任务
  • 设置任务完成后的回调
  • 更灵活地控制线程池

🔹 2. 常用方法

CompletableFuture.supplyAsync(() -> {
    // 有返回值的异步任务
    return "hello";
});

CompletableFuture.runAsync(() -> {
    // 无返回值的异步任务
    System.out.println("run");
});

连续执行:

CompletableFuture.supplyAsync(() -> {
    return 10;
}).thenApply(result -> {
    return result * 2;
}).thenAccept(System.out::println); // 输出 20

组合任务:

CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> 20);

f1.thenCombine(f2, Integer::sum)
  .thenAccept(System.out::println); // 输出 30

自定义线程池:

ExecutorService executor = Executors.newFixedThreadPool(4);

CompletableFuture.supplyAsync(() -> "data", executor);

⚠️ 注意事项

项目 说明

异常处理 推荐使用 .exceptionally() 或 .handle() 处理异常,防止线程崩溃

线程池管理 不建议使用默认线程池(ForkJoinPool),应自定义线程池以便控制并发资源

任务阻塞 使用 .join() 或 .get() 会阻塞当前线程,不建议在主线程或Web线程中调用

链式执行 .thenApply() 是同步执行、.thenApplyAsync() 是异步执行

组合过多任务 可以用 CompletableFuture.allOf() 合并多个异步任务,避免嵌套地狱

✅ 二、

parallelStream

使用及注意事项

🔹 1. 简介

parallelStream 是 Java 8 中 Stream 接口提供的并行处理功能,底层使用 ForkJoinPool.commonPool() 进行任务分发。

List<String> list = Arrays.asList("a", "b", "c");
list.parallelStream().forEach(System.out::println);

🔹 2. 适用场景

  • 数据量较大
  • CPU密集型任务(如数学计算、压缩加密)
  • 操作无副作用(无共享变量)

⚠️ 注意事项

项目 说明

线程池不可控 使用的是公共的 ForkJoinPool.commonPool(),难以调优或隔离任务

不可预测顺序 parallelStream().forEach() 是无序的,用 .forEachOrdered() 保持顺序

线程安全问题 不要在并行流中修改共享变量,如 List.add(),需用线程安全结构或 collect()

性能不一定更好 小数据量时反而更慢(线程调度成本高于串行),需评估后使用

不适合 I/O 密集 如网络、数据库操作,使用线程池 + CompletableFuture 更合适

✅ 三、选择建议

使用场景 推荐工具

并行处理 List,无共享变量 parallelStream

控制线程数、复杂任务链 CompletableFuture

I/O 密集型任务(网络/数据库) CompletableFuture + 自定义线程池

有返回值或任务组合 CompletableFuture.supplyAsync().thenCombine()

✅ 四、实际案例对比

1️⃣ parallelStream 示例

List<Integer> data = IntStream.range(1, 100).boxed().collect(Collectors.toList());
data.parallelStream()
    .map(i -> i * 2)
    .forEach(System.out::println);

2️⃣ CompletableFuture 并行示例

List<Integer> data = IntStream.range(1, 100).boxed().collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(10);

List<CompletableFuture<Void>> futures = data.stream()
    .map(i -> CompletableFuture.runAsync(() -> {
        System.out.println(Thread.currentThread().getName() + ": " + i * 2);
    }, executor))
    .collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executor.shutdown();

📌 总结

对比项 CompletableFuture parallelStream

控制性 强(可定义线程池、异常处理) 弱(用公共线程池)

并发类型 适合 IO 密集或复杂业务 适合 CPU 密集型任务

异步组合能力 支持 then、combine 等链式调用 不支持复杂任务组合

开发复杂度 略高 简单

评论