[译] 管中窥豹:RxJava 与 Kotlin 协程的对比,OMG学它

相比 RxJava 的函数式,用协程写出来的代码逻辑更简洁,而且代码看起来就像是线性和同步的一样。

suspend fun coroutineWay() {
val t0 = System.currentTimeMillis()

var i = 0;
while (true) { // (1)
println(“Attempt " + (i + 1) + " at T=” +
(System.currentTimeMillis() - t0))

var v1 = async(CommonPool) { f1(i) } // (2)
var v2 = async(CommonPool) { f2(i) }

var v3 = launch(CommonPool) { // (3)
Thread.sleep(500)
println(" Cancelling at T=" +
(System.curr
entTimeMillis() - t0))
val te = TimeoutException();
v1.cancel(te); // (4)
v2.cancel(te);
}

try {
val r1 = v1.await(); // (5)
val r2 = v2.await();
v3.cancel(); // (6)
println(r1 + r2)
break;
} catch (ex: TimeoutException) { // (7)
println(" Crash at T=" +
(System.currentTimeMillis() - t0))
if (++i > 2) { // (8)
throw ex;
}
}
}
println(“End at T=”

  • (System.currentTimeMillis() - t0)) // (9)

}

添加的一些输出是用来观察这段代码如何运行的。

  1. 通常线性编程的情况下,是没有直接重试某个操作的快捷方法的,因此,我们需要建立一个循环以及重试计数器 i
  2. 通过 async(CommonPool) 来执行异步操作,该函数可以在一些后台线程立即启动并执行函数。该函数会返回一个 Deferred,稍后会用到这个值。 如果用 await() 来得到 v1 作为最终值的话,当前线程将会挂起,另外,对 v2 的计算也不会开始,除非前一个恢复执行。除此以外,我们还需要在超时的情况下取消当前操作的方法。参考步骤 3 和 5。
  3. 如果想让两个操作都超时的话,看起来我们只能在另一个异步线程中执行等待操作。launch(CommonPool) 方法会返回一个可以用在这种情况下的 Job 对象。 与 async 的区别是,这样执行无法返回值。之所以保存返回的 Job 是因为先前的异步操作可能及时返回,就不再需要取消操作了。
  4. 在超时的任务中,我们用 TimeoutException 来取消 v1v2 ,这将恢复任何已经挂起来等待二者返回的操作。
  5. 等待两个函数运行结果。如果超时,await 将重新扔出在第四步中使用的异常。
  6. 如果没有异常,则取消不再需要执行的超时任务,并跳出循环。
  7. 如果有超时,则走老一套捕获异常并执行状态检查来确定下一步操作。注意任何其他异常都会直接被抛出并退出循环。
  8. 万一是第三次或更多次的尝试,直接扔出异常,什么都不做。
  9. 如果一切按剧本走,打印运行的总时间,然后退出当前函数。

看起来挺简单的,尽管取消机制可能搞个大新闻:如果 v2 因为其他异常(比如网络原因导致的 IOException)崩溃了呢?当然我们得处理这些情况来确保任务可以在各种情况下被取消(举个栗子,试试 Kotlin 中的资源?)。然而,这种情况发生的背景是 v1 会及时返回,直到尝试 await 之前都无法取消 v1 或检测 v2 的崩溃。

不要在意那些细节,反正程序跑起来了,运行结果如下:

Attempt 1 at T=0
Cancelling at T=531
Crash at T=2017
Attempt 2 at T=2017
Cancelling at T=2517
Crash at T=4026
Attempt 3 at T=4026
3
End a

一共进行了 3 次尝试,最后一次成功了,值是 3。是不是和剧本一模一样的?一点都不快(此处有双关(译者并没有看出来哪里有双关))! 我们可以看到取消事件发生的大概时间,两次不成功的请求之后大约 500 ms ,然而异常捕获发生在大约 2000 ms 之后!我们知道 cancel() 被成功调用是因为我们捕获了异常。然而,看起来函数中的 Thread.sleep() 并没有被打断,或者用协程的说法,没有在打断异常时恢复。这可能是 CommonPool 的一部分,对 Future.cancel(false) 的调用处于基础结构中,抑或只是简单的程序限制。

响应式

接下来我们看看 RxJava 2 是如何实现相同操作的。让人失望的是,如果函数前加了 suspended,就无法通过普通方式调用了,所以我们还得用普通方法重写一下两个函数:

fun f3(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1
}

fun f4(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2
}

为了匹配阻塞外部环境的功能,我们采用  RxJava 2 Extensions 中的 BlockingScheduler 来提供返回到主线程的功能。顾名思义,它阻塞了一开始的调用者/主线程,直到有任务通过调度器来提交并运行。

fun reactiveWay() {
RxJavaPlugins.setErrorHandler({ }) // (1)

val sched = BlockingScheduler() // (2)
sched.execute {
val t0 = System.currentTimeMillis()
val count = Array(1, { 0 }) // (3)

Single.defer({ // (4)
val c = count[0]++;
println(“Attempt " + (c + 1) +
" at T=” + (System.currentTimeMillis() - t0))

Single.zip( // (5)
Single.fromCallable({ f3© })
.subscribeOn(Schedulers.io()),
Single.fromCallable({ f4© })
.subscribeOn(Schedulers.io()),
BiFunction<Int, Int> { a, b -> a + b } // (6)
)
})
.doOnDispose({ // (7)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS) // (8)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException // (9)
})
.doAfterTerminate { sched.shutdown() } // (10)
.subscribe({
println(it)
println(“End at T=” +
(System.currentTimeMillis() - t0)) // (11)
},
{ it.printStackTrace() })
}
}

实现起来有点长,对那些不熟悉 lambda 的人来说看起来可能有点可怕。

  1. 众所周知 RxJava 2 无论如何都会传递异常。在 Android 上,无法传递的异常会使应用崩溃,除非使用 RxJavaPlugins.setErrorHandler 来捕获。在此,因为我们知道取消事件会打断 Thread.sleep() ,调用栈打出来的结果只会是一团乱麻,我们也不会去注意这么多的异常。
  2. 设置 BlockingScheduler 并分发第一个执行的任务,以及剩下的主线程执行逻辑。 这是由于一旦锁住, start() 将会给主线程增加一个活锁状态,直到有任何随后事件打破锁定,主线程才会继续执行。
  3. 设置一个堆变量来记录重试次数。
  4. 一旦有通过 Single.defer 的订阅,计数器加一并打印 “Attempt” 字符串。该操作符允许保留每个订阅的状态,这正是我们在下游执行的 retry() 操作符所期望的。
  5. 使用 zip 操作符来异步执行两个元素的计算,二者都在后台线程执行自己的函数。
  6. 当二者都完成时,将结果相加。
  7. 为了让超时取消,使用 doOnDispose 操作符来打印当前状态和时间。
  8. 使用 timeout 操作符定义求和的超时。如果超时则会发送 TimeoutException(例如该场景下没有反馈时)。
  9. retry 操作符的重载提供了重试时间以及当前错误。打印错误后,应该返回 true ——也就是说必须执行重试——如果重试次数小于三并且当前错误是 TimeoutException 的话。任何其他错误只会终止而不是触发重试。
  10. 一旦完成,我们需要关闭调度器,来让释放主线程并退出JVM。
  11. 当然,在完成前我们需要打印求和结果以及整个操作的耗时。

可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:

Cancelling at T=4527

Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089
耗时。

可能有人说,这比协程的实现复杂多了。不过……至少跑起来了:

Cancelling at T=4527

Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089


版权声明:本文为m0_66264699原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。