Java 8中异步处理

Java8中提供了CompletableFeature工具,提供了一系列基于函数式的api。本篇主要介绍这些api的使用以及源码实现。

CompletableFeature.supplyAsyc()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//1.输入一个supplier函数,输出一个CompletableFeature<U>引用
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//2.传入一个线程池,一个supplier函数,输出一个CompletableFeature<U>引用
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
//异步调用之后执行return d
e.execute(new AsyncSupply<U>(d, f));
//主线程直接返回创建的对象
return d;
}
//3.线程池中线程异步执行supplier函数
@SuppressWarnings("serial")
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
//初始化时用主线程CompletableFeature对象引用
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}

public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }

public void run() {
//这里的 d引用==线程初始化对象引用dep==主线程对象引用
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//异步线程内部同步执行supplier函数,f.get返回类型和泛型类型保持一致
d.completeValue(f.get());
} catch (Throwable ex) {
//发生异常将异常作为结果对象
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}

//4.d.completeValue通过unsafe cas替换,到这里异步线程执行完成并将执行结果记录在volatile object result对象中
final boolean completeValue(T t) {
//入参this(object1),result(volatile object类型),null(拷贝旧值),t(返回新值)
return UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t);
}
//5.这里的RESULT
private static final sun.misc.Unsafe UNSAFE;
// 为什么是long类型,暂时理解为内存地址吧。。
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
final sun.misc.Unsafe u;
UNSAFE = u = sun.misc.Unsafe.getUnsafe();
Class<?> k = CompletableFuture.class;
//获取声明字段的内存地址
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
NEXT = u.objectFieldOffset
(Completion.class.getDeclaredField("next"));
} catch (Exception x) {
throw new Error(x);
}
}

主线程循环等待 volatile object result返回CompletableFeature.get()。同时可以设置超时时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
//就这一句判断,反正就是不给我我就循环,占着cpu,你说难受不难受,如果不设置超时时间异步线程不返回,就死循环卡着了,是不是很伤。内部应该处理了一些什么东西,避免死循环占用cpu,可以换成阻塞式,让出cpu。
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
//这里之前死循环,spins == 1<<8 === 256 次,进行256次死循环。检查result,如果这段时间没有返回。执行到下面的代码。
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
//这里会进入然后一直阻塞,让出cpu,知道result返回之后才返回继续执行一次循环,然后退出循环。
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}

附上流程图吧:

cmd-markdown-logo

推荐设置超时时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Created by yqz on 11/20/18.
*/
public class CompleteFeatureTest {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> stringCompletableFuture=CompletableFuture.supplyAsync(()->{
try {
//定义的body,默认覆盖,定义一个supplier函数异步调用,模拟网络调用
Thread.sleep(1000000);
} catch (InterruptedException e) {
throw new RuntimeException("同步异常");
} finally {
//return 的类型为定义的模板类型
return null;
}
});

try {
String string= null;
try {
string = stringCompletableFuture.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println("getResultString:"+string);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

CompletableFeature.runAsync()

1
2
3
CompletableFuture.runAsync(() ->
System.out.println("CompleteFeature runAsync...")
);

异步调用线程,不关心返回结果,这里入参是一个lamada表达式。Java8之前的写法是匿名内部类的方式:

1
2
3
4
5
6
7
8
9
CompletableFuture.runAsync(
//java8 之后不推荐这么写,采用lamada表达式
new Runnable() {
@Override
public void run() {
System.out.println("CompleteFeature runAsync...");
}
}
);

这个函数调用不关心返回结果。

CompletableFeature.runAsync().whenComplete()/.whenCompleteAsync

  • 1.CompletableFeature.runAsync()开辟一个异步线程a,主线程直接返回CompletableFeature类型的一个引用,不关心a的执行结果。
  • 2.主线程用CompletableFeature的引用调用.whenComplete()直接返回CompletableFeature的引用,a线程执行完成之后,执行whenComplete()中的BiConsumer方法,完全异步,不干扰主线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.CompletableFuture;
/**
* Created by yqz on 11/20/18.
*/
public class CompleteFeatureTest {
public static void main(String[] args) throws InterruptedException {
CompletableFuture<?> completableFuture=CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + " CompleteFeature runAsync...");
}).whenComplete((t, e) ->{
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + " CompleteFeature runAsync...complete");
});
CompletableFuture<?> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + " CompleteFeature runAsync...1"); }).whenCompleteAsync((t, u) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + " CompleteFeature runAsync...complete...1");
});
CompletableFuture<?> all=CompletableFuture.allOf(completableFuture,completableFuture1);
all.whenComplete(
(v,e)->
System.out.println("all completed")
);
try {
all.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
10   CompleteFeature runAsync...
11 CompleteFeature runAsync...1
10 CompleteFeature runAsync...complete
11 CompleteFeature runAsync...complete...1
all completed

Process finished with exit code 0