评论

收藏

[Java] Java8 自定义CompletableFuture的原理解析

编程语言 编程语言 发布于:2022-03-11 11:04 | 阅读数:457 | 评论:0

这篇文章主要介绍了Java8 自定义CompletableFuture的原理解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
目录

  • Java8 自定义CompletableFuture原理
  • CompleteFuture简单使用

    • 下面简单介绍用法


Java8 自定义CompletableFuture原理
Future 接口 的局限性有很多,其中一个就是需要主动的去询问是否完成,如果等子线程的任务完成以后,通知我,那岂不是更好?
public class FutureInAction3 {
  public static void main(String[] args) {
    Future<String> future = invoke(() -> {
      try {
        Thread.sleep(10000L);
        return "I am Finished.";
      } catch (InterruptedException e) {
        return "I am Error";
      }
    });
    future.setCompletable(new Completable<String>() {
      @Override
      public void complete(String s) {
        System.out.println("complete called ---- " + s);
      }
      @Override
      public void exception(Throwable cause) {
        System.out.println("error");
        cause.printStackTrace();
      }
    });
    System.out.println("....do something else .....");
    System.out.println("try to get result ->" + future.get());
  }
  private static <T> Future<T> invoke(Callable<T> callable) {
    AtomicReference<T> result = new AtomicReference<>();
    AtomicBoolean finished = new AtomicBoolean(false);
    Future<T> future = new Future<T>() {
      private Completable<T> completable;
      @Override
      public T get() {
        return result.get();
      }
      @Override
      public boolean isDone() {
        return finished.get();
      }
      // 设置完成
      @Override
      public void setCompletable(Completable<T> completable) {
        this.completable = completable;
      }
      // 获取
      @Override
      public Completable<T> getCompletable() {
        return completable;
      }
    };
    Thread t = new Thread(() -> {
      try {
        T value = callable.action();
        result.set(value);
        finished.set(true);
        if (future.getCompletable() != null)
          future.getCompletable().complete(value);
      } catch (Throwable cause) {
        if (future.getCompletable() != null)
          future.getCompletable().exception(cause);
      }
    });
    t.start();
    return future;
  }
  private interface Future<T> {
    T get();
    boolean isDone();
    //  1
    void setCompletable(Completable<T> completable);
    //  2
    Completable<T> getCompletable();
  }
  private interface Callable<T> {
    T action();
  }
  // 回调接口
  private interface Completable<T> {
    void complete(T t);
    void exception(Throwable cause);
  }
}
DSC0000.png


CompleteFuture简单使用
Java8 中的 completeFuture 是对 Future 的扩展实现, 主要是为了弥补 Future 没有相应的回调机制的缺陷.
我们先看看 Java8 之前的 Future 的使用
package demos;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * @author djh on  2019/4/22 10:23
 * @E-Mail 1544579459@qq.com
 */
public class Demo {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService cachePool = Executors.newCachedThreadPool();
    Future<String> future = cachePool.submit(() -> {
      Thread.sleep(3000);
      return "异步任务计算结果!";
    });
    // 提交完异步任务后, 主线程可以继续干一些其他的事情.
    doSomeThingElse();
    // 为了获取异步计算结果, 我们可以通过 future.get 和 轮询机制来获取.
    String result;
    // Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷.
    // result = future.get();
    // 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载.
    long start = System.currentTimeMillis();
    while (true) {
      if (future.isDone()) {
        break;
      }
    }
    System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));    
    result = future.get();
    System.out.println("获取到异步计算结果啦: " + result);
    cachePool.shutdown();
  }
  private static void doSomeThingElse() {
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.");
  }
}
输出:
我的最重要的事情干完了, 我要获取异步计算结果来执行剩下的事情.
轮询耗时:2000
获取到异步计算结果啦: 异步任务计算结果!
Process finished with exit code 0
从上面的 Demo 中我们可以看出, future 在执行异步任务时, 对于结果的获取显的不那么优雅, 很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果, 如Google的: ListenableFuture, 而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API.

下面简单介绍用法
package demos;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author djh on  2019/5/1 20:26
 * @E-Mail 1544579459@qq.com
 */
public class CompleteFutureDemo {
  public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
    ExecutorService cachePool = Executors.newCachedThreadPool();
    cachePool.execute(() -> {
      try {
        Thread.sleep(3000);
        completableFutureOne.complete("异步任务执行结果");
        System.out.println(Thread.currentThread().getName());
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    // WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果.
    CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {
      System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);
    });
    // ThenApply 方法返回的是一个新的 completeFuture.
    CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
      System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return s.length();
    });
    System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());
    cachePool.shutdown();
  }
}
从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法, 这两个方法便是 CompleteFuture 中最具有意义的方法, 他们都会在 completeFuture 调用 complete 方法传入异步计算结果时回调, 从而获取到异步任务的结果.
相比之下 future 的阻塞和轮询方式获取异步任务的计算结果, CompleteFuture 获取结果的方式就显的优雅的多。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持CodeAE代码之家
原文链接:https://artisan.blog.csdn.net/article/details/115450097

关注下面的标签,发现更多相似文章