目录

再看 Kotlin Coroutines

前言

https://www.youtube.com/watch?v=Mj5P47F6nJg

这个演讲太棒了,演讲人是 Roman Elizarov - Project Lead for the Kotlin Programming Language

按照他的 PPT 我自己总结了一下

Kotlin Coroutines Design Story

首先 kotlin 的协程库 = 线程池 + 任务调度,但是这个调度过程比较复杂,而且整个过程也并没有直接接管 JVM 或者 操作系统层面的东西,所以也没有 go routines 的神奇的抢占调度。 然后只有一个关键字 suspend, suspend 函数机制

https://www.youtube.com/watch?v=YrrUCSi72E8

(视频讲的如何 resume 回来,演讲人依然是 Roman Elizarov),并且一再强调 async/await 等其他语言的关键字,在 kotlin 中只是一个基础方法,kotlin 想更通用。

Inspired by async/await

Roman Elizarov 说,kotlin coroutines 的设计原型受到了 C# 的 async/await 的启发

/img/in-post/inspired_async.jpeg

并列举了几个其他语言的例子,要么语言天然支持 async/await,要么有相应的广泛使用的 lib 库支持类似的东西

/img/in-post/inspired_by_other.jpeg

Kotlin DSL

按照前面说的 async/await 的原型,kotlin coroutines 就可以是类似这种样子(最终当然不是这样子的,为了引出来 suspend):

fun postItem(item: Item) = async {
  val token = await(requestToken())
  val post = await(createPost(token, item))
}

这里他还解释了一下,async 在这里其实就是个函数,不过 kotlin 支持 trailing lambda,函数体移动到了外面

这个 Trailing Lambda 的存在,可以搞出任意的 dsl,因为函数体可以作为参数,加上可以做为扩展函数,并且移动到了大括号后 如果你是作者,你也可以不叫它 async(kotlin 中确实也不叫 asnyc) 但是,这个 await 显然很 trick,所以在 kotlin 中提出了 suspend 关键字,代码修改成了这样:

suspend fun postItem(item: Item) {
  val token = requestToken()
  val post = createPost(token, item)
}

这段代码除了 suspend 没有任何多余的关键字或者让人难理解的地方,代码和普通函数没有任何区别 并且说,这并不是一个大胆的设计,或者说是不符合习惯的设计

因为这和 go 很相似,函数还是常规函数,通过一个 go 关键字,就开启了协程(还好以前写过 go,如此看来 go 的哲学还是很牛逼的呀)。

Prototyping Libraries(dsl for kotlin)

这里又讲了一波 go 的设计,go 牛逼!(破音)

仿照 go 的开启协程的方式,kotlin 的 dsl 成为了这样:

suspend fun say(s: String) {
  for(i in 0..4) {
    delay(100) // 这里是因为之前举了个例子,go 的协程中可以 sleep,但是并不是真的 Java 中的 Thread.sleep
    println(s)
  }
}

fun main() = mainBlocking{
  go {
    say(hello")
  }
}

实际上呢?上述的 go 和 mainBlocking 函数,在 kotlin 的实现中,对应了 launch 和 runBlocking 这两个函数

所以,这里来看,很多东西在概念上都是同一个意义,只不过取了不同的名字而已

Roman Elizarov 说,为了让基础库更好,让大家 happy programming

kotlin 天然支持 Thread-bound UI programming

举例说,在 Android 经常会做一个耗时任务,然后最终要把结果用于 UI 更新,而 UI 操作只能在主线程

这里就引出来了 Coroutine Context,context 绑定了协程任务所需要的任何信息,当然也可以通过指定 context 来指定运行时的线程

看过源码的话就会发现,很多东西都继承自 context,都可以作为 context 的 element,被存起来 Dispatchers 也是如此。听名字就知道适用于调度任务的,通过 launch(Dispatchers.IO) 传入的 io dispatcher 替换掉了 parent context 中默认的 dispatcher

写了个例子:

fun main() {
    runBlocking { // 隐式存在一个 coroutineScope 对象(就理解为 context) 作为 parent context
        launch(Dispatchers.IO) { // 传入了 dispatcher 替换了 parent 中的 dispatcher 
            println("thread name = ${Thread.currentThread().name}")
        }
        delay(1000)
        println("thread name = ${Thread.currentThread().name}")
    }
    print("end")
}

后面又说了高阶函数的情况,没咋听懂也没看懂。感觉就还是说 CoroutineContext 是可以层层传递下去的?

说到这里,Coroutine Scope 和 Coroutine Context 什么关系?

Scope and Context 小插曲

其实查看源码你就会发现:

public interface CoroutineScope {
    /**
     * The context of this scope.
     * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
     * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
     *
     * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
     */
    public val coroutineContext: CoroutineContext
}

CoroutineScope 就是持有了一个 CoroutineContext,那到底为啥有俩?

这里是 Roman Elizarov 的文章

https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055

文章的开头就说,同样的东西在不同的用途上,通常会有不同的名字。所以说,他们可以理解为同一个东西,只是不同场景下的叫法。

CoroutineScope 是为了指出协程启动的 scope

CoroutineContext 提供了额外的、可以覆盖继承自 parent scope 的内容

比如之前举得切换线程的例子,通过传入 Dispatcher(extends CoroutineContext) 替换了 child scope 运行时所处的线程

怎么理解呢?

其实,每当你传入 context 的时候(比如传入 dispatcher),都会产生一个新的 context,代码如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

coroutineContext + context 这个就很有意思,其实是运算符重载。context 的 plus 方法如下:

 /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

注释中也说了,其实 context 像是一个大 Map,两个 context 的组合就像是两个 map 的组合,相同的 key 会被覆盖,就是文中的这张图:

/img/in-post/conroutines_scope.png

Cancellation

Roman Elizarov 又介绍了一些关于协程任务取消的设计 这里又提了 go。go 可以传入 channel 作为一个 cancellation token,来退出任务,但是他说这样你要自己记得自己怎么处理的

代码如下(为啥举了个菲波那切数列的例子,感觉和并发没啥关系额,不知道有何深意,没理解):

func fibonacci(c, quit chan int) {
  x, y := 0, 1
  for {
    select {
      case c <- x:
        x, y = y, x+y
      case <-quit:
        fmt.Println(quit)
        return
    }
  }
}

同时又说了 go 也有 Context,但是只要你需要在某一级子任务使用的话,应该是需要自己一层层传递下去的(这里 gopher 有话说,听说 go 语言有一个哲学是,显式优于隐式),所以它应该就是想这么设计。

kotlin 中是 lib 库帮我们传入了 parent scope 的 context(如果有),并且合并了自己的 context。

然后说,go 经常处理的是一次请求,从 request 到 response,贯穿整个过程,所以引出了 Lifetime prototype。

Lifetime Prototype

这里介绍了一下协程的生命周期的设计原型 因为上面说的是有关取消任务,所以这里 Roman Elizarov 举了个例子

/img/in-post/lifetime_prototype.jpeg

当然,最终这个所谓的 Lifetime,也就是后来的 kotlin coroutines 里面的 Job 这个类(extends Context)。

launch 等函数都可以返回一个 Job 对象,从而对于整个任务进行管理。

Children Coroutines

Roman Elizarov 又举了个小例子

val job1 = launch { say("hello") }
val job2 = launch { say("world") }

// 常规的取消任务的办法
val jobs = CompositeJob()
jobs.add(job1)
jobs.add(job2)
jobs.cancel()

代码里,介绍了取消多个任务的一种办法,但是 Roman Elizarov 觉得这并不是一个好的设计,无故增加了很多个概念、api、代码量等等。(这个好熟悉啊,RxJava CompositiDisposable)

kotlin coroutine 的 context 是可以传递下去的,context 包含了协程任务的所有东西,launch 可以返回 Job 对象,Job 也是 CoroutineContext 子类

context 传递下去后,可以说是在 parent context 中启动了协程任务,这就有了关联性。 所以 kotlin 有了这样的设计:

val job =  launch(UI) {
  launch(coroutineContext) { say("hello") }
  launch(coroutineContext) { say("world") }
}
job.cancel()

这里 job 的取消,就会取消 UI context 下的任务(scope 这时候还没出现,而且 scope 本身也是 context)

Error Propagation

上述这样设计很好,但是这样问题来了,怎么处理错误状况?

比如之前的代码中启动的两个协程任务,那么他们都有可能会失败,而且是并发的,所以外层的协程任务只能等待

/img/in-post/error_propagation.jpeg

并且给出了一个 Job 的生命周期

/img/in-post/job.jpeg

Scope

前面也知道了,如果存在父子关系的两个协程任务,子任务中出现异常,父任务也会结束 所以 Roman Elizarov 举了个例子

/img/in-post/decomposition.jpeg

所以想要 sayHelloWorld 函数跟着一起抛异常,Scope 就出现了,通过加一个 withScope 返回了一个新的 Job,如果 scope 相同,sayHelloWorld 同样会抛出异常,父任务结束,如下图:

/img/in-post/scope_concurrency.jpeg

但是这样的写法不好,verbose and error-pone 即 代码冗长且容易出错

所以呢?最终形成了现在协程库的写法(其实这里也可以看出 scope 是个抽象层的概念,实体就是 context)

而且 launch 只是个方法,并且是 coroutineScope 的 extension function(扩展方法),所以没有 scope 无法调用 launch 方法,也避免了容易出错的问题

同时 suspend 方法,自己会知道自己所处的 scope

Structured Concurrency

Roman Elizarov 说搞完了这一套东西,后来发现了一篇文章

https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful

这套东西称之为 Structured Concurrency

/img/in-post/structured_concurrency.jpeg

同时,多种语言都有相应的实现或者基于一些 lib 库的实现 远远不止这些

kotlin coroutines 远远不止这些,还有许许多多的细节,并且 scope 上绑了很多扩展函数,比如 async 等等

而且,其实我自己也没实际工程使用过 kotlin coroutines,只是写了几个 hello world 😆

也许可能会有一些理解偏差,但是并不是什么大问题。因为框架在脑海里,如果实际使用时发现结果和预想不一致,再去推敲,反而受益更多

看完这个演讲,感觉自己基础不是很扎实,很多语言方面的理念有些无法理解,但是似乎又有所精进? 对于 kotlin coroutines 以后有机会的话多多使用,才更有体会