相比 RxJava 的函数式,用协程写出来的代码逻辑更简洁,而且代码看起来就像是线性和同步的一样。
suspend fun coroutineWay() {
val t0 = System.currentTimeMillis()
var i = 0;
while (true) { // (1)
println(“Attempt " + (i + 1) + " at T=” +
(System.currentTimeMillis() - t0))
var v1 = async(CommonPool) { f1(i) } // (2)
var v2 = async(CommonPool) { f2(i) }
var v3 = launch(CommonPool) { // (3)
Thread.sleep(500)
println(" Cancelling at T=" +
(System.curr
entTimeMillis() - t0))
val te = TimeoutException();
v1.cancel(te); // (4)
v2.cancel(te);
}
try {
val r1 = v1.await(); // (5)
val r2 = v2.await();
v3.cancel(); // (6)
println(r1 + r2)
break;
} catch (ex: TimeoutException) { // (7)
println(" Crash at T=" +
(System.currentTimeMillis() - t0))
if (++i > 2) { // (8)
throw ex;
}
}
}
println(“End at T=”
- (System.currentTimeMillis() - t0)) // (9)
}
添加的一些输出是用来观察这段代码如何运行的。
- 通常线性编程的情况下,是没有直接重试某个操作的快捷方法的,因此,我们需要建立一个循环以及重试计数器 i。
- 通过 async(CommonPool) 来执行异步操作,该函数可以在一些后台线程立即启动并执行函数。该函数会返回一个 Deferred,稍后会用到这个值。 如果用 await() 来得到 v1 作为最终值的话,当前线程将会挂起,另外,对 v2 的计算也不会开始,除非前一个恢复执行。除此以外,我们还需要在超时的情况下取消当前操作的方法。参考步骤 3 和 5。
- 如果想让两个操作都超时的话,看起来我们只能在另一个异步线程中执行等待操作。launch(CommonPool) 方法会返回一个可以用在这种情况下的 Job 对象。 与 async 的区别是,这样执行无法返回值。之所以保存返回的 Job 是因为先前的异步操作可能及时返回,就不再需要取消操作了。
- 在超时的任务中,我们用 TimeoutException 来取消 v1 和 v2 ,这将恢复任何已经挂起来等待二者返回的操作。
- 等待两个函数运行结果。如果超时,await 将重新扔出在第四步中使用的异常。
- 如果没有异常,则取消不再需要执行的超时任务,并跳出循环。
- 如果有超时,则走老一套捕获异常并执行状态检查来确定下一步操作。注意任何其他异常都会直接被抛出并退出循环。
- 万一是第三次或更多次的尝试,直接扔出异常,什么都不做。
- 如果一切按剧本走,打印运行的总时间,然后退出当前函数。
看起来挺简单的,尽管取消机制可能搞个大新闻:如果 v2 因为其他异常(比如网络原因导致的 IOException)崩溃了呢?当然我们得处理这些情况来确保任务可以在各种情况下被取消(举个栗子,试试 Kotlin 中的资源?)。然而,这种情况发生的背景是 v1 会及时返回,直到尝试 await 之前都无法取消 v1 或检测 v2 的崩溃。
不要在意那些细节,反正程序跑起来了,运行结果如下:
Attempt 1 at T=0
Cancelling at T=531
Crash at T=2017
Attempt 2 at T=2017
Cancelling at T=2517
Crash at T=4026
Attempt 3 at T=4026
3
End a
一共进行了 3 次尝试,最后一次成功了,值是 3。是不是和剧本一模一样的?一点都不快(此处有双关(译者并没有看出来哪里有双关))! 我们可以看到取消事件发生的大概时间,两次不成功的请求之后大约 500 ms ,然而异常捕获发生在大约 2000 ms 之后!我们知道 cancel() 被成功调用是因为我们捕获了异常。然而,看起来函数中的 Thread.sleep() 并没有被打断,或者用协程的说法,没有在打断异常时恢复。这可能是 CommonPool 的一部分,对 Future.cancel(false) 的调用处于基础结构中,抑或只是简单的程序限制。
响应式
接下来我们看看 RxJava 2 是如何实现相同操作的。让人失望的是,如果函数前加了 suspended,就无法通过普通方式调用了,所以我们还得用普通方法重写一下两个函数:
fun f3(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1
}
fun f4(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2
}
为了匹配阻塞外部环境的功能,我们采用 RxJava 2 Extensions 中的 BlockingScheduler 来提供返回到主线程的功能。顾名思义,它阻塞了一开始的调用者/主线程,直到有任务通过调度器来提交并运行。
fun reactiveWay() {
RxJavaPlugins.setErrorHandler({ }) // (1)
val sched = BlockingScheduler() // (2)
sched.execute {
val t0 = System.currentTimeMillis()
val count = Array(1, { 0 }) // (3)
Single.defer({ // (4)
val c = count[0]++;
println(“Attempt " + (c + 1) +
" at T=” + (System.currentTimeMillis() - t0))
Single.zip( // (5)
Single.fromCallable({ f3© })
.subscribeOn(Schedulers.io()),
Single.fromCallable({ f4© })
.subscribeOn(Schedulers.io()),
BiFunction<Int, Int> { a, b -> a + b } // (6)
)
})
.doOnDispose({ // (7)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS) // (8)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException // (9)
})
.doAfterTerminate { sched.shutdown() } // (10)
.subscribe({
println(it)
println(“End at T=” +
(System.currentTimeMillis() - t0)) // (11)
},
{ it.printStackTrace() })
}
}
实现起来有点长,对那些不熟悉 lambda 的人来说看起来可能有点可怕。
- 众所周知 RxJava 2 无论如何都会传递异常。在 Android 上,无法传递的异常会使应用崩溃,除非使用 RxJavaPlugins.setErrorHandler 来捕获。在此,因为我们知道取消事件会打断 Thread.sleep() ,调用栈打出来的结果只会是一团乱麻,我们也不会去注意这么多的异常。
- 设置 BlockingScheduler 并分发第一个执行的任务,以及剩下的主线程执行逻辑。 这是由于一旦锁住, start() 将会给主线程增加一个活锁状态,直到有任何随后事件打破锁定,主线程才会继续执行。
- 设置一个堆变量来记录重试次数。
- 一旦有通过 Single.defer 的订阅,计数器加一并打印 “Attempt” 字符串。该操作符允许保留每个订阅的状态,这正是我们在下游执行的 retry() 操作符所期望的。
- 使用 zip 操作符来异步执行两个元素的计算,二者都在后台线程执行自己的函数。
- 当二者都完成时,将结果相加。
- 为了让超时取消,使用 doOnDispose 操作符来打印当前状态和时间。
- 使用 timeout 操作符定义求和的超时。如果超时则会发送 TimeoutException(例如该场景下没有反馈时)。
- retry 操作符的重载提供了重试时间以及当前错误。打印错误后,应该返回 true ——也就是说必须执行重试——如果重试次数小于三并且当前错误是 TimeoutException 的话。任何其他错误只会终止而不是触发重试。
- 一旦完成,我们需要关闭调度器,来让释放主线程并退出JVM。
- 当然,在完成前我们需要打印求和结果以及整个操作的耗时。
可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:
Cancelling at T=4527
Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089
耗时。
可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:
Cancelling at T=4527
Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089