再看 Kotlin Coroutines
前言
https://www.youtube.com/watch?v=Mj5P47F6nJg
这个演讲太棒了,演讲人是 Roman Elizarov - Project Lead for the Kotlin Programming Language
按照他的 PPT 我自己总结了一下
Kotlin Coroutines Design Story
首先 kotlin 的协程库 = 线程池 + 任务调度,但是这个调度过程比较复杂,而且整个过程也并没有直接接管 JVM 或者 操作系统层面的东西,所以也没有 go routines 的神奇的抢占调度。 然后只有一个关键字 suspend, suspend 函数机制
https://www.youtube.com/watch?v=YrrUCSi72E8
(视频讲的如何 resume 回来,演讲人依然是 Roman Elizarov),并且一再强调 async/await 等其他语言的关键字,在 kotlin 中只是一个基础方法,kotlin 想更通用。
Inspired by async/await
Roman Elizarov 说,kotlin coroutines 的设计原型受到了 C# 的 async/await 的启发
并列举了几个其他语言的例子,要么语言天然支持 async/await,要么有相应的广泛使用的 lib 库支持类似的东西
Kotlin DSL
按照前面说的 async/await 的原型,kotlin coroutines 就可以是类似这种样子(最终当然不是这样子的,为了引出来 suspend):
fun postItem(item: Item) = async {
val token = await(requestToken())
val post = await(createPost(token, item))
}
这里他还解释了一下,async 在这里其实就是个函数,不过 kotlin 支持 trailing lambda,函数体移动到了外面
这个 Trailing Lambda 的存在,可以搞出任意的 dsl,因为函数体可以作为参数,加上可以做为扩展函数,并且移动到了大括号后 如果你是作者,你也可以不叫它 async(kotlin 中确实也不叫 asnyc) 但是,这个 await 显然很 trick,所以在 kotlin 中提出了 suspend 关键字,代码修改成了这样:
suspend fun postItem(item: Item) {
val token = requestToken()
val post = createPost(token, item)
}
这段代码除了 suspend 没有任何多余的关键字或者让人难理解的地方,代码和普通函数没有任何区别 并且说,这并不是一个大胆的设计,或者说是不符合习惯的设计
因为这和 go 很相似,函数还是常规函数,通过一个 go 关键字,就开启了协程(还好以前写过 go,如此看来 go 的哲学还是很牛逼的呀)。
Prototyping Libraries(dsl for kotlin)
这里又讲了一波 go 的设计,go 牛逼!(破音)
仿照 go 的开启协程的方式,kotlin 的 dsl 成为了这样:
suspend fun say(s: String) {
for(i in 0..4) {
delay(100) // 这里是因为之前举了个例子,go 的协程中可以 sleep,但是并不是真的 Java 中的 Thread.sleep
println(s)
}
}
fun main() = mainBlocking{
go {
say(“hello")
}
}
实际上呢?上述的 go 和 mainBlocking 函数,在 kotlin 的实现中,对应了 launch 和 runBlocking 这两个函数
所以,这里来看,很多东西在概念上都是同一个意义,只不过取了不同的名字而已
Roman Elizarov 说,为了让基础库更好,让大家 happy programming
kotlin 天然支持 Thread-bound UI programming
举例说,在 Android 经常会做一个耗时任务,然后最终要把结果用于 UI 更新,而 UI 操作只能在主线程
这里就引出来了 Coroutine Context
,context 绑定了协程任务所需要的任何信息,当然也可以通过指定 context 来指定运行时的线程
看过源码的话就会发现,很多东西都继承自 context,都可以作为 context 的 element,被存起来 Dispatchers 也是如此。听名字就知道适用于调度任务的,通过 launch(Dispatchers.IO) 传入的 io dispatcher 替换掉了 parent context 中默认的 dispatcher
写了个例子:
fun main() {
runBlocking { // 隐式存在一个 coroutineScope 对象(就理解为 context) 作为 parent context
launch(Dispatchers.IO) { // 传入了 dispatcher 替换了 parent 中的 dispatcher
println("thread name = ${Thread.currentThread().name}")
}
delay(1000)
println("thread name = ${Thread.currentThread().name}")
}
print("end")
}
后面又说了高阶函数的情况,没咋听懂也没看懂。感觉就还是说 CoroutineContext 是可以层层传递下去的?
说到这里,Coroutine Scope 和 Coroutine Context 什么关系?
Scope and Context 小插曲
其实查看源码你就会发现:
public interface CoroutineScope {
/**
* The context of this scope.
* Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
* Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
*
* By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
*/
public val coroutineContext: CoroutineContext
}
CoroutineScope 就是持有了一个 CoroutineContext,那到底为啥有俩?
这里是 Roman Elizarov 的文章
https://elizarov.medium.com/coroutine-context-and-scope-c8b255d59055
文章的开头就说,同样的东西在不同的用途上,通常会有不同的名字。所以说,他们可以理解为同一个东西,只是不同场景下的叫法。
CoroutineScope 是为了指出协程启动的 scope
CoroutineContext 提供了额外的、可以覆盖继承自 parent scope 的内容
比如之前举得切换线程的例子,通过传入 Dispatcher(extends CoroutineContext) 替换了 child scope 运行时所处的线程
怎么理解呢?
其实,每当你传入 context 的时候(比如传入 dispatcher),都会产生一个新的 context,代码如下:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
coroutineContext + context 这个就很有意思,其实是运算符重载。context 的 plus 方法如下:
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
注释中也说了,其实 context 像是一个大 Map,两个 context 的组合就像是两个 map 的组合,相同的 key 会被覆盖,就是文中的这张图:
Cancellation
Roman Elizarov 又介绍了一些关于协程任务取消的设计 这里又提了 go。go 可以传入 channel 作为一个 cancellation token,来退出任务,但是他说这样你要自己记得自己怎么处理的
代码如下(为啥举了个菲波那切数列的例子,感觉和并发没啥关系额,不知道有何深意,没理解):
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println(quit)
return
}
}
}
同时又说了 go 也有 Context,但是只要你需要在某一级子任务使用的话,应该是需要自己一层层传递下去的(这里 gopher 有话说,听说 go 语言有一个哲学是,显式优于隐式
),所以它应该就是想这么设计。
kotlin 中是 lib 库帮我们传入了 parent scope 的 context(如果有),并且合并了自己的 context。
然后说,go 经常处理的是一次请求,从 request 到 response,贯穿整个过程,所以引出了 Lifetime prototype。
Lifetime Prototype
这里介绍了一下协程的生命周期的设计原型 因为上面说的是有关取消任务,所以这里 Roman Elizarov 举了个例子
当然,最终这个所谓的 Lifetime,也就是后来的 kotlin coroutines 里面的 Job 这个类(extends Context)。
launch 等函数都可以返回一个 Job 对象,从而对于整个任务进行管理。
Children Coroutines
Roman Elizarov 又举了个小例子
val job1 = launch { say("hello") }
val job2 = launch { say("world") }
// 常规的取消任务的办法
val jobs = CompositeJob()
jobs.add(job1)
jobs.add(job2)
jobs.cancel()
代码里,介绍了取消多个任务的一种办法,但是 Roman Elizarov 觉得这并不是一个好的设计,无故增加了很多个概念、api、代码量等等。(这个好熟悉啊,RxJava CompositiDisposable)
kotlin coroutine 的 context 是可以传递下去的,context 包含了协程任务的所有东西,launch 可以返回 Job 对象,Job 也是 CoroutineContext 子类
context 传递下去后,可以说是在 parent context 中启动了协程任务,这就有了关联性。 所以 kotlin 有了这样的设计:
val job = launch(UI) {
launch(coroutineContext) { say("hello") }
launch(coroutineContext) { say("world") }
}
job.cancel()
这里 job 的取消,就会取消 UI context 下的任务(scope 这时候还没出现,而且 scope 本身也是 context)
Error Propagation
上述这样设计很好,但是这样问题来了,怎么处理错误状况?
比如之前的代码中启动的两个协程任务,那么他们都有可能会失败,而且是并发的,所以外层的协程任务只能等待
并且给出了一个 Job 的生命周期
Scope
前面也知道了,如果存在父子关系的两个协程任务,子任务中出现异常,父任务也会结束 所以 Roman Elizarov 举了个例子
所以想要 sayHelloWorld 函数跟着一起抛异常,Scope 就出现了,通过加一个 withScope 返回了一个新的 Job,如果 scope 相同,sayHelloWorld 同样会抛出异常,父任务结束,如下图:
但是这样的写法不好,verbose and error-pone 即 代码冗长且容易出错
所以呢?最终形成了现在协程库的写法(其实这里也可以看出 scope 是个抽象层的概念,实体就是 context)
而且 launch 只是个方法,并且是 coroutineScope 的 extension function(扩展方法),所以没有 scope 无法调用 launch 方法,也避免了容易出错的问题
同时 suspend 方法,自己会知道自己所处的 scope
Structured Concurrency
Roman Elizarov 说搞完了这一套东西,后来发现了一篇文章
https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful
这套东西称之为 Structured Concurrency
同时,多种语言都有相应的实现或者基于一些 lib 库的实现 远远不止这些
kotlin coroutines 远远不止这些,还有许许多多的细节,并且 scope 上绑了很多扩展函数,比如 async 等等
而且,其实我自己也没实际工程使用过 kotlin coroutines,只是写了几个 hello world 😆
也许可能会有一些理解偏差,但是并不是什么大问题。因为框架在脑海里,如果实际使用时发现结果和预想不一致,再去推敲,反而受益更多
看完这个演讲,感觉自己基础不是很扎实,很多语言方面的理念有些无法理解,但是似乎又有所精进? 对于 kotlin coroutines 以后有机会的话多多使用,才更有体会