refer
1-quick start
1-1 hello world
1)-快速的 Hello World
dependencies {
// runtimeOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
}import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}runBlocking启动了一个 上下文. 只有这个Scope内部才能启动协程launch则是启动协程的方法delay:suspend方法, 把协程挂起,等待唤醒, 完全不会block底层的thread, 这个thread会被其他的协程调度使用.
1-2 structured concurrency
1)-Demo
协程一定要在 CoroutineScope 中才能启动. 这是基于 golang 的 routine 的优缺点反复讨论之后的 协程范式, 结构化并发.
- 协程的生命周期在
Scope中会被明确的定义 ; - 协程可以有 父子关系, 父协程不会在 子协程完成之前结束 ;
sequenceDiagram participant P as Parent Scope participant C1 as Child Coroutine 1 participant C2 as Child Coroutine 2 P->>C1: Launch P->>C2: Launch C1-->>C1: Execute C2-->>C2: Execute C1-->>P: Complete C2-->>P: Complete P-->>P: All children complete P-->>P: Scope completes
fun main() = runBlocking {
try {
coroutineScope { // 创建一个协程作用域
launch {
delay(500L)
println("Task 1 完成")
}
launch {
delay(300L)
println("Task 2 完成")
throw RuntimeException("Task 2 出错")
}
launch {
delay(400L)
println("Task 3 完成")
}
println("等待所有任务完成")
}
} catch (e: Exception) {
println("捕获到异常: ${e.message}")
}
println("所有任务已完成或被取消")
}2)-History
结构化并发模式 的概念是 2018 年由 Nathaniel J Smith 提出. 理论基础:
- 把并发程序的执行流程 组织为一个 树状结构, 让程序的执行路径更加的清晰和可以预测;
- 解决资源泄露问题: 通过把并发操作限制在一个特殊的 作用域中,确保所有的子任务 在作用域结束的时候会被自动清理 ;
- 解决错误问题的复杂性问题: 提供了 一种统一的 错误传播机制, 让错误可以 树状关系向上传播 ;
- 把任务的 生命周期和创建它的作用域绑定,简化管理
3) scope
- The coroutine scope is responsible for the structure and parent-child relationships between different coroutines. New coroutines usually
1-3 Example
1)-获取 repos 的 Contributors
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<GithubRepo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<GithubUser>>
}fun loadContributorsBlocking(
service: GitHubService,
req: RequestData
): List<User> {
val repos = service
.getOrgReposCall(req.org) // #1. 发起 API, 获取所有的仓库
.execute() // #2. 同步执行
.also { logRepos(req, it) } // #3. 记录日志
.body() ?: emptyList() // #4. 获取响应体
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name) // #1. 对每个仓库,获取 Contributors
.execute() // #2
.also { logUsers(repo, it) } // #3
.bodyList() // #4
}.aggregate()
}结果如下:
1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors ...
when (getSelectedVariant()) {
BLOCKING -> { // Blocking UI thread
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
}
}2)-异步化的方式
fun loadContributorsBackground(
service: GitHubService,
req: RequestData,
updateResults: (List<User>) -> Unit
) {
thread {
loadContributorsBlocking(service, req)
}
}3)-callback 的方式
fun loadContributorsCallbacks(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
}
// TODO: Why doesn't this code work? How to fix that?
updateResults(allUsers.aggregate())
}- 这里的
onResponse是一个异步化的callback. 但是updateResults会有问题 . **正确的时机是在所有的子任务都结束后,调用updateResults**
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
// processing repository
countDownLatch.countDown()
}
}
countDownLatch.await()
updateResults(allUsers.aggregate())- 一种姿势使用
countDownLatch来解决问题.
4)-使用 suspend 来解决问题
interface GithubApiService {
@GET("mock/github/orgs/{org}/repos")
suspend fun getOrgRepos(@Path("org") org: String): Response<List<GithubRepo>>
@GET("mock/github/repos/{owner}/{repo}/contributors")
suspend fun getRepoContributors(
@Path("owner") owner: String, @Path("repo") repo: String
): Response<List<GithubUser>>
}- 返回值变为
Response这个应该是同步的, 之前的Call
suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body()!!
val users = repos.flatMap { repo ->
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
return users
}Tips
协程的 实现依赖于 协程库. 如果还是线程阻塞的调用,依旧会阻塞线程
- 此时,当前逻辑变为了 异步,类似上面 线程化方案的做法.
5)-嵌套协程实现并行
suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {
return coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body()!!
val users = repos.flatMap { repo ->
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
val deferredUsers = repos.map { repo ->
async {
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
}
deferredUsers.awaitAll().flatten()
}
}- 使用
async而不是launch类似Callable可以接收返回值 Defered类似future或者promise
Tips
Deferredis a generic type that extendsJob. Anasynccall can returns aDeferred<Int>or aDeferred<CustomType>, depending on what the lambda returns (the last expression inside the lambda is the result).
2-Cancellation
1)-What’s cancel
返回的 Job 对象就自然有 Cancelled 的能力.
首先要理解什么样的东西可以做到 取消. 取消机制的本质是协作的, 换句话说,要想取消,要靠 协程代码去主动的检查是否被取消.
下面的代码不会主动取消
while (true) {
println("still working")
}可以使用 isActive 或者 ensureActive 来支持取消
fun main() {
runBlocking(Dispatchers.Default) {
val job = launch {
// repeat(1000) { i ->
// println("job: I'm sleeping $i ...")
// delay(500L)
// }
while (true) {
ensureActive()
println("still working")
}
}
// delay(1300L)
println("main: I'm tired of waiting!")
job.cancel()
job.join()
}
}delay 通过异常来触发协作取消.
val job = launch(Dispatchers.Default) {
repeat(5) { i ->
try {
// print a message twice a second
println("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
// log the exception
println(e)
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")- 由于异常被吃了导致 无法取消
2)-Cancel 的2种姿势
- 使用一个
suspend function, 比如说yield就非常合适 - 定时检查,类似上面的
isActive
3)-Finally
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")cancelAndJoin这个方法
4)-可以使用 NonCancellable 创建不可取消的代码块
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")5)-Timeout 和资源泄露
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
val resource = withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
Resource() // Acquire a resource and return it from withTimeout block
}
resource.close() // Release the resource
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}- 有可能资源泄露,应该用 try-finally 包装起来