Kotlin学习手记——协程进阶

作用域:
在这里插入图片描述
顶级:
在这里插入图片描述
在这里插入图片描述
coroutineScope表示协同作用域,coroutineScope内部的协程出现异常可以挂掉外部协程,会向外部传播,外部协程挂掉也会挂掉子协程,即双向传播。
在这里插入图片描述
supervisorScope表示主从作用域,supervisorScope内部的协程挂掉不会影响外部的协程继续运行,它就像一道防火墙,隔离了异常,保证程序健壮,但是如果外部协程挂掉还是可以取消子协程的,即单向传播。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
简单总结就是,主从关系:无法坑爹,爹可以坑儿子。协同关系:可以坑爹,可以坑儿子,互相坑。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
如果是应用的话,主要掌握框架级别的使用即可,语言级别的支持api来源于标准库,写起来比较麻烦也非常难理解。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这里launch会进行一次调度 ,delay会进行一次调度,每次调度完成会执行一次resume, 最终协程体执行完毕会执行一次resume, 所以内部有n个挂起点的协程体会执行n+2次resume.
在这里插入图片描述
DEFAULT 立即开始调度 和 UNDISPATCHED 立即开始执行协程体,这两个含义的区别是 DEFAULT 只是立即启动协程执行可能是异步的,而后者是直接执行协程体中的代码了。LAZY 是先创建协程体,然后在未来的某个时刻才去启动执行。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
UNDISPATCHED 立即开始执行协程体,如果遇到挂起点,就切回主流程了,后面的协程体继续执行在单独的调度器。

import kotlinx.coroutines.*


@ExperimentalCoroutinesApi
suspend fun main() {
    println("start")
    testDefaultMode()
//    testAtomicMode()
//    testLazyMode()
//    testUNDISPATCHEDMode()
    println("finish")
}

suspend fun testDefaultMode() {
    val defaultMode = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
        println("aaa")
        delay(3000)
        println("bbb")
    }
    println("222")
    defaultMode.join()
}

@ExperimentalCoroutinesApi
suspend fun testAtomicMode() {
    val defaultMode = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
        println("aaa")
        delay(3000)
        println("bbb")
    }
    println("222")
    defaultMode.join()
}

suspend fun testLazyMode() {
    val defaultMode = GlobalScope.async(start = CoroutineStart.LAZY) {
        println("aaa")
        delay(3000)
        println("bbb")
    }
    println("222")
    defaultMode.await()
}

@ExperimentalCoroutinesApi
suspend fun testUNDISPATCHEDMode() {
    val defaultMode = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
        println("aaa")
        delay(3000)
        println("bbb")
    }
    println("222")
    defaultMode.join()
}

在这里插入图片描述
Default和IO线程的区别,IO内部多了一个队列的维护
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

回调转协程的完整写法:

import com.bennyhuo.kotlin.coroutines.advanced.common.gitHubServiceApi
import kotlinx.coroutines.suspendCancellableCoroutine
import retrofit2.Call
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

suspend fun <T> Call<T>.await(): T = suspendCancellableCoroutine { //可取消
    continuation ->
    continuation.invokeOnCancellation {
        cancel() //调用retrofit的取消方法
    }

    enqueue(object: Callback<T> {
        override fun onFailure(call: Call<T>, t: Throwable) {
            continuation.resumeWithException(t)
        }

        override fun onResponse(call: Call<T>, response: Response<T>) {
            response.takeIf { it.isSuccessful }?.body()?.also {continuation.resume(it) }
                ?: continuation.resumeWithException(HttpException(response))
        }

    })
}



suspend fun main() {
    val user = gitHubServiceApi.getUserCallback("flycumt").await()
    println(user)
}

也可以不自己写,retrofit的api中本身有实现await()方法,awaitResponse()方法等。

CompletableFuture 添加回调的写法:

import com.bennyhuo.kotlin.coroutines.advanced.utils.log
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

suspend fun main() {
    val result = CompletableFuture.supplyAsync {
        3
    }.await()

    log(result)
}

suspend fun <T> CompletableFuture<T>.await(): T {
    if(isDone){
        try {
            return get()
        } catch (e: ExecutionException) {
            throw e.cause ?: e
        }
    }
    return suspendCancellableCoroutine { //可取消
        cancellableContinuation ->
        cancellableContinuation.invokeOnCancellation {
            cancel(true) //取消
        }

        whenComplete { value, throwable ->
            if(throwable == null){
                cancellableContinuation.resume(value)
            } else {
                cancellableContinuation.resumeWithException(throwable.cause ?: throwable)
            }
        }
    }
}

CompletableFuture本身也有实现await()方法。

模仿给Handler扩展添加可取消的支持:

suspend fun <T> Handler.run(block: () -> T) = suspendCoroutine<T> { continuation ->
    post {
        try {
            continuation.resume(block())
        } catch (e: Exception) {
            continuation.resumeWithException(e)
        }
    }
}

suspend fun <T> Handler.runDelay(delay: Long, block: () -> T) = suspendCancellableCoroutine<T> { continuation ->

    val message = Message.obtain(this) { //Message obtain(Handler h, Runnable callback)
        try {
            continuation.resume(block())
        } catch (e: Exception){
            continuation.resumeWithException(e)
        }
    }.also {
        it.obj = continuation //message.obj
    }

    continuation.invokeOnCancellation {
        removeCallbacksAndMessages(continuation) //通过Handler的removeCallbacksAndMessages方法来取消回调, 参数就是前面设置的message.obj的值
    }

    sendMessageDelayed(message, delay)
}


suspend fun main() {
    Looper.prepareMainLooper()

    GlobalScope.launch {
        val handler = Handler(Looper.getMainLooper())
        val result = handler.run { "Hello" }
        val delayedResult = handler.runDelay(5000){ "World" }
        log(result, delayedResult)
        Looper.getMainLooper().quit()
    }

    Looper.loop()
}

这个例子的主要意图是,Hanlder可以通过定义扩展函数的方式来延时获取一些东西,比如Activity刚创建的时候,拿不到view的宽和高,就可以使用这种方法。

上面三个例子主要是针对可取消的写法,如果实际用,不用自己写,直接导库就行。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
其中CONFLATED比较适合用于状态更新,比如进度条的进度,因为它总是只取最新的。

在这里插入图片描述
在这里插入图片描述
关闭后再发送会抛异常:
在这里插入图片描述
channel关闭后,channel中的数据仍然可以被接受,只有当channel中的数据消费完了,isClosedForReceive才为true.
在这里插入图片描述

suspend fun main() {
    basics()
}

suspend fun basics() {
    val channel = Channel<Int>(Channel.RENDEZVOUS)
//    val channel = Channel<Int>(Channel.UNLIMITED)
//    val channel = Channel<Int>(Channel.CONFLATED)
//    val channel = Channel<Int>(Channel.BUFFERED)
//    val channel = Channel<Int>(1)

    //生产者 发
    val producer = GlobalScope.launch {
        for (i in 0..3) {
            log("sending", i)
            channel.send(i)
            log("sent", i)
        }
        channel.close()
    }

    //消费者 收
    val consumer = GlobalScope.launch {
        while (!channel.isClosedForReceive) {
            log("receiving")
            val value = channel.receiveOrNull()
            log("received", value)
        }
    }

    producer.join()
    consumer.join()
}

Channel(Channel.RENDEZVOUS ) 的方式是发一个收一个,边发边收,如果没有接受的,发送者会挂起等待,输出如下:
在这里插入图片描述
Channel(Channel.UNLIMITED ) 的方式是全部发送完毕,才会接收到,先发后收,发送者发送完就返回了,不管有没有接受者,输出如下:在这里插入图片描述
Channel(Channel.CONFLATED ) 的方式是不管发了多少个,只能收到最后一个,也是发送完就返回了,不管有没有接受者,输出如下:
在这里插入图片描述
Channel(Channel.BUFFERED ) 的方式也是发送者发送完就返回了,不管有没有接受者,可以指定buffer大小,输出如下:
在这里插入图片描述
Channel(1) 的方式指定管道的容量大小,如果数据超过容量,发送者就会挂起等待,直到有接受者取走数据,发送者才发送下一批数据,
在这里插入图片描述

在这里插入图片描述
channel接受数据的时候可以直接当成迭代器使用:

suspend fun iterateChannel() {
    val channel = Channel<Int>(Channel.UNLIMITED)

    val producer = GlobalScope.launch {
        for (i in 0..3) {
            log("sending", i)
            channel.send(i)
            log("sent", i)
        }
        channel.close()
    }

    val consumer = GlobalScope.launch {
        for (i in channel) {
            log("received: ", i)
        }
    }

    producer.join()
    consumer.join()
}

在这里插入图片描述

suspend fun producer() {
    val receiveChannel = GlobalScope.produce(capacity = Channel.UNLIMITED) {
        for (i in 0..3) {
            log("sending", i)
            send(i)
            log("sent", i)
        }
    }

    val consumer = GlobalScope.launch {
        for (i in receiveChannel) {
            log("received: ", i)
        }
    }

    consumer.join()
}

suspend fun consumer() {
    val sendChannel = GlobalScope.actor<Int>(capacity = Channel.UNLIMITED) {
        for (i in this) {
            log("received: ", i)
        }
    }

    val producer = GlobalScope.launch {
        for (i in 0..3) {
            log("sending", i)
            sendChannel.send(i)
            log("sent", i)
        }
    }

    producer.join()
}

在这里插入图片描述
在这里插入图片描述

suspend fun broadcast() {
    //下面几种都可以创建一个BroadcastChannel
    //val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    //val broadcastChannel = Channel<Int>(Channel.BUFFERED).broadcast()
    val broadcastChannel = GlobalScope.broadcast {
        for (i in 0..5) {
            send(i)
        }
    }

    //启动5个接受者,每个都能收到
    List(5) { index ->
        GlobalScope.launch {
            val receiveChannel = broadcastChannel.openSubscription()
            for (i in receiveChannel) {
                log("[#$index] received: $i")
            }
        }
    }.joinAll()
}

输出:

> Task :ChannelsKt.main()
21:07:12:924 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 0
21:07:12:924 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: 0
21:07:12:924 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 0
21:07:12:925 [DefaultDispatcher-worker-4 @coroutine#5] [#3] received: 0
21:07:12:925 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 0
21:07:12:944 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 1
21:07:12:943 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 1
21:07:12:943 [DefaultDispatcher-worker-5 @coroutine#6] [#4] received: 1
21:07:12:944 [DefaultDispatcher-worker-4 @coroutine#5] [#3] received: 1
21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 1
21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#4] [#2] received: 2
21:07:12:945 [DefaultDispatcher-worker-8 @coroutine#3] [#1] received: 2
21:07:12:945 [DefaultDispatcher-worker-8 @coroutine#3] [#1] received: 3
21:07:12:945 [DefaultDispatcher-worker-7 @coroutine#4] [#2] received: 3
21:07:12:945 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 2
21:07:12:946 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 3
21:07:12:946 [DefaultDispatcher-worker-8 @coroutine#5] [#3] received: 2
21:07:12:946 [DefaultDispatcher-worker-8 @coroutine#5] [#3] received: 3
21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 2
21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 3
21:07:12:946 [DefaultDispatcher-worker-1 @coroutine#3] [#1] received: 4
21:07:12:946 [DefaultDispatcher-worker-3 @coroutine#2] [#0] received: 4
21:07:12:946 [DefaultDispatcher-worker-1 @coroutine#6] [#4] received: 4
21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#5] [#3] received: 4
21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#5] [#3] received: 5
21:07:12:947 [DefaultDispatcher-worker-2 @coroutine#3] [#1] received: 5
21:07:12:947 [DefaultDispatcher-worker-6 @coroutine#2] [#0] received: 5
21:07:12:947 [DefaultDispatcher-worker-2 @coroutine#6] [#4] received: 5
21:07:12:947 [DefaultDispatcher-worker-3 @coroutine#4] [#2] received: 4
21:07:12:947 [DefaultDispatcher-worker-3 @coroutine#4] [#2] received: 5

在这里插入图片描述
在这里插入图片描述
Select的使用场景是多个协程异步执行时,获取最先结束的那个协程结果返回,比如加载图片时,可能从网络获取,也可能从本地获取,这两种可能同时异步执行,使用Select就会优先获取返回比较快的本地结果展示,然后我们再去获取网络最新的更新即可。
在这里插入图片描述
使用例子:

val localDir = File("localCache").also { it.mkdirs() }

val gson = Gson()

fun CoroutineScope.getUserFromApi(login: String) = async(Dispatchers.IO){
    gitHubServiceApi.getUserSuspend(login)
}

fun CoroutineScope.getUserFromLocal(login:String) = async(Dispatchers.IO){
    File(localDir, login).takeIf { it.exists() }?.readText()?.let { gson.fromJson(it, User::class.java) }
}

fun cacheUser(login: String, user: User){
    File(localDir, login).writeText(gson.toJson(user))
}

data class Response<T>(val value: T, val isLocal: Boolean)

suspend fun main() {
    val login = "test"
    GlobalScope.launch {
        val localDeferred = getUserFromLocal(login)
        val remoteDeferred = getUserFromApi(login)

        //val userResponse  = Response(localDeferred.await(), true)

		//select选择优先返回的结果
        val userResponse = select<Response<User?>> {
            localDeferred.onAwait { Response(it, true) }
            remoteDeferred.onAwait { Response(it, false) }
        }

        userResponse.value?.let { log(it) } //获取结果显示 输出
        //如果是本地的结果,重新请求,并缓存本地
        userResponse.isLocal.takeIf { it }?.let {
            val userFromApi = remoteDeferred.await()
            cacheUser(login, userFromApi)
            log(userFromApi)
        }
    }.join()
}

如果有多个异步请求同时返回,select会按顺序取第一个,想要随机的取可以使用selectUnbiased
在这里插入图片描述
select大括号中onAwait的写法等价于 await() 的写法,localDeferred.await(),还有很多操作join send等都是一样的前面加on

在这里插入图片描述
例子:使用channel和select实现统计代码行数

val KotlinFileFilter = { file: File -> file.isDirectory || file.name.endsWith(".kt") }

data class FileLines(val file: File, val lines: Int) {
    override fun toString(): String {
        return "${file.name}: $lines"
    }
}

suspend fun main() {
    val result = lineCounter(File("."))
    log(result)
}

suspend fun lineCounter(root: File): HashMap<File, Int> {
    return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1).asCoroutineDispatcher()
        //使用use自动关闭资源
        .use {
            //withContext是一个挂起函数 返回值是最后一行表达式的值
            withContext(it){
                val fileChannel  = walkFile(root)

                //定义5个同时读取
                val fileLinesChannels = List(5){
                    fileLineCounter(fileChannel)
                }

                resultAggregator(fileLinesChannels)
            }
        }
}

//创建生产者返回ReceiveChannel
fun CoroutineScope.walkFile(root: File): ReceiveChannel<File> {
    return produce(capacity = Channel.BUFFERED) {
        fileWalker(root)
    }
}

//递归过滤kotlin文件并发送文件
suspend fun SendChannel<File>.fileWalker(file: File) {
    if(file.isDirectory){
        file.listFiles()?.filter(KotlinFileFilter)?.forEach { fileWalker(it) }
    } else {
        send(file)
    }
}

//输入File 返回FileLines对象
fun CoroutineScope.fileLineCounter(input: ReceiveChannel<File>): ReceiveChannel<FileLines> {
    return produce(capacity = Channel.BUFFERED) {
        for (file in input){
            //统计行数
            file.useLines {
                send(FileLines(file, it.count())) //发送结果
            }
        }
    }
}

suspend fun CoroutineScope.resultAggregator(channels: List<ReceiveChannel<FileLines>>): HashMap<File, Int> {
    val map = HashMap<File, Int>()
    channels.aggregate {
            filteredChannels ->
                //使用select返回最快统计的那一个
                select<FileLines?> {
                    filteredChannels.forEach {
                        it.onReceiveOrNull {
                            log("received: $it")
                            it
                        }
                    }
                } ?.let {
                    map[it.file] = it.lines
                }
    }
    return map
}

//tailrec-递归优化 定义List<ReceiveChannel<FileLines>>的扩展函数,过滤掉已完成的
tailrec suspend fun List<ReceiveChannel<FileLines>>.aggregate(block: suspend (List<ReceiveChannel<FileLines>>) -> Unit) {
    block(this)//消费一次
    //从当前list中过掉isClosedForReceive=true的ReceiveChannel
    filter { !it.isClosedForReceive }.takeIf { it.isNotEmpty() }?.aggregate(block)//递归
}

在这里插入图片描述
Sequence中不能调用其他挂起函数,不能设置调度器,只能单线程中使用。而Flow可以支持:
在这里插入图片描述
在这里插入图片描述
Flow中调用delay会把后面的代码切换到默认调度器上执行,也可以显示的指定调度器:

在这里插入图片描述
在这里插入图片描述

suspend fun flows(){
    val intFlow = flow {
        emit(1)
        delay(100)
        emit(2)
        emit(3)
    }
    val dispatcher = Executors.newSingleThreadExecutor { Thread(it, "MyThread").also { it.isDaemon = true } }.asCoroutineDispatcher()
    GlobalScope.launch(dispatcher) {
        intFlow.flowOn(Dispatchers.IO)
            .collect { log(it) }
    }.join()
}

在这里插入图片描述
对比RxJava的线程切换方式很像,flowOn传递的调度器指定flow里面的代码执行在哪个线程上,而launch传递的调度器指定flow执行完后resume恢复执行在哪个线程上。
在这里插入图片描述
在这里插入图片描述
flow-catch-onCompletion 和 java的 try-catch-finally 基本类似,onCompletion中的代码是一定会执行的,不同的是有异常发生的时候,会携带一个异常参数。

suspend fun exception(){
    flow<Int> {
        emit(1)
        throw ArithmeticException("Div 0")
    }.catch {t: Throwable ->
        log("caught error: $t")
    }.onCompletion { t: Throwable? ->
        log("finally.")
    }.flowOn(Dispatchers.IO)
        .collect { log(it) }

//    flow { // bad!!!
//        try {
//            emit(1)
//            throw ArithmeticException("Div 0")
//        } catch (t: Throwable){
//            log("caught error: $t")
//        } finally {
//            log("finally.")
//        }
//    }
}

flow的异常捕获使用flow自己的api处理就行,不需要内部再进行try-catch.

在这里插入图片描述
Flow没有提供单独的取消方法,要取消Flow只需要取消flow.collect { } 所在的协程即可。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Flow内部不能再去切换线程,如果需要这样做可以使用channelFlow

在这里插入图片描述
由于是流的概念flow也有背压的问题,也就是接受端来不及消费,发送端会累积大量的数据,感觉kotlin抄RxJava也抄了不少啊啊啊。。。背压解决办法,要么采用只保留最新conflate,要么取消之前发送的值collectLatest
在这里插入图片描述

suspend fun backPressure(){
    flow {
        emit(1)
        delay(50)
        emit(2)
    }.collectLatest { value ->
        println("Collecting $value")
        delay(100) // Emulate work
        println("$value collected")
    }
}

上面的例子collectLatest当中100毫秒之后只能接受到2,因为延时100的过程中发送2的时候会把1取消掉。


版权声明:本文为lyabc123456原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。