refer
1-quick start
1-1 hello world
1)-ๅฟซ้็ Hello World
dependencies {
// runtimeOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")
}import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}runBlockingๅฏๅจไบไธไธช ไธไธๆ. ๅชๆ่ฟไธชScopeๅ ้จๆ่ฝๅฏๅจๅ็จlaunchๅๆฏๅฏๅจๅ็จ็ๆนๆณdelay:suspendๆนๆณ, ๆๅ็จๆ่ตท,็ญๅพ ๅค้, ๅฎๅ จไธไผblockๅบๅฑ็thread, ่ฟไธชthreadไผ่ขซๅ ถไป็ๅ็จ่ฐๅบฆไฝฟ็จ.
1-2 structured concurrency
1)-Demo
ๅ็จไธๅฎ่ฆๅจ CoroutineScope ไธญๆ่ฝๅฏๅจ. ่ฟๆฏๅบไบ golang ็ routine ็ไผ็ผบ็นๅๅค่ฎจ่ฎบไนๅ็ ๅ็จ่ๅผ, ็ปๆๅๅนถๅ.
- ๅ็จ็็ๅฝๅจๆๅจ
Scopeไธญไผ่ขซๆ็กฎ็ๅฎไน ; - ๅ็จๅฏไปฅๆ ็ถๅญๅ ณ็ณป, ็ถๅ็จไธไผๅจ ๅญๅ็จๅฎๆไนๅ็ปๆ ;
sequenceDiagram participant P as Parent Scope participant C1 as Child Coroutine 1 participant C2 as Child Coroutine 2 P->>C1: Launch P->>C2: Launch C1-->>C1: Execute C2-->>C2: Execute C1-->>P: Complete C2-->>P: Complete P-->>P: All children complete P-->>P: Scope completes
fun main() = runBlocking {
try {
coroutineScope { // ๅๅปบไธไธชๅ็จไฝ็จๅ
launch {
delay(500L)
println("Task 1 ๅฎๆ")
}
launch {
delay(300L)
println("Task 2 ๅฎๆ")
throw RuntimeException("Task 2 ๅบ้")
}
launch {
delay(400L)
println("Task 3 ๅฎๆ")
}
println("็ญๅพ
ๆๆไปปๅกๅฎๆ")
}
} catch (e: Exception) {
println("ๆ่ทๅฐๅผๅธธ: ${e.message}")
}
println("ๆๆไปปๅกๅทฒๅฎๆๆ่ขซๅๆถ")
}2)-History
็ปๆๅๅนถๅๆจกๅผ ็ๆฆๅฟตๆฏ 2018 ๅนด็ฑ Nathaniel J Smith ๆๅบ. ็่ฎบๅบ็ก:
- ๆๅนถๅ็จๅบ็ๆง่กๆต็จ ็ป็ปไธบไธไธช ๆ ็ถ็ปๆ๏ผ ่ฎฉ็จๅบ็ๆง่ก่ทฏๅพๆดๅ ็ๆธ ๆฐๅๅฏไปฅ้ขๆต;
- ่งฃๅณ่ตๆบๆณ้ฒ้ฎ้ข: ้่ฟๆๅนถๅๆไฝ้ๅถๅจไธไธช็นๆฎ็ ไฝ็จๅไธญ๏ผ็กฎไฟๆๆ็ๅญไปปๅก ๅจไฝ็จๅ็ปๆ็ๆถๅไผ่ขซ่ชๅจๆธ ็ ;
- ่งฃๅณ้่ฏฏ้ฎ้ข็ๅคๆๆง้ฎ้ข: ๆไพไบ ไธ็ง็ปไธ็ ้่ฏฏไผ ๆญๆบๅถ, ่ฎฉ้่ฏฏๅฏไปฅ ๆ ็ถๅ ณ็ณปๅไธไผ ๆญ ;
- ๆไปปๅก็ ็ๅฝๅจๆๅๅๅปบๅฎ็ไฝ็จๅ็ปๅฎ๏ผ็ฎๅ็ฎก็
3) scope
- The coroutine scope is responsible for the structure and parent-child relationships between different coroutines. New coroutines usually
1-3 Example
1)-่ทๅ repos ็ Contributors
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<GithubRepo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<GithubUser>>
}fun loadContributorsBlocking(
service: GitHubService,
req: RequestData
): List<User> {
val repos = service
.getOrgReposCall(req.org) // #1. ๅ่ตท API, ่ทๅๆๆ็ไปๅบ
.execute() // #2. ๅๆญฅๆง่ก
.also { logRepos(req, it) } // #3. ่ฎฐๅฝๆฅๅฟ
.body() ?: emptyList() // #4. ่ทๅๅๅบไฝ
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name) // #1. ๅฏนๆฏไธชไปๅบ๏ผ่ทๅ Contributors
.execute() // #2
.also { logUsers(repo, it) } // #3
.bodyList() // #4
}.aggregate()
}็ปๆๅฆไธ:
1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors ...
when (getSelectedVariant()) {
BLOCKING -> { // Blocking UI thread
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
}
}2)-ๅผๆญฅๅ็ๆนๅผ
fun loadContributorsBackground(
service: GitHubService,
req: RequestData,
updateResults: (List<User>) -> Unit
) {
thread {
loadContributorsBlocking(service, req)
}
}3)-callback ็ๆนๅผ
fun loadContributorsCallbacks(
service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit
) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
}
// TODO: Why doesn't this code work? How to fix that?
updateResults(allUsers.aggregate())
}- ่ฟ้็
onResponseๆฏไธไธชๅผๆญฅๅ็callback. ไฝๆฏupdateResultsไผๆ้ฎ้ข . **ๆญฃ็กฎ็ๆถๆบๆฏๅจๆๆ็ๅญไปปๅก้ฝ็ปๆๅ๏ผ่ฐ็จupdateResults**
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name)
.onResponse { responseUsers ->
// processing repository
countDownLatch.countDown()
}
}
countDownLatch.await()
updateResults(allUsers.aggregate())- ไธ็งๅงฟๅฟไฝฟ็จ
countDownLatchๆฅ่งฃๅณ้ฎ้ข.
4)-ไฝฟ็จ suspend ๆฅ่งฃๅณ้ฎ้ข
interface GithubApiService {
@GET("mock/github/orgs/{org}/repos")
suspend fun getOrgRepos(@Path("org") org: String): Response<List<GithubRepo>>
@GET("mock/github/repos/{owner}/{repo}/contributors")
suspend fun getRepoContributors(
@Path("owner") owner: String, @Path("repo") repo: String
): Response<List<GithubUser>>
}- ่ฟๅๅผๅไธบ
Response่ฟไธชๅบ่ฏฅๆฏๅๆญฅ็, ไนๅ็Call
suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body()!!
val users = repos.flatMap { repo ->
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
return users
}Tips
ๅ็จ็ ๅฎ็ฐไพ่ตไบ ๅ็จๅบ. ๅฆๆ่ฟๆฏ็บฟ็จ้ปๅก็่ฐ็จ๏ผไพๆงไผ้ปๅก็บฟ็จ
- ๆญคๆถ๏ผๅฝๅ้ป่พๅไธบไบ ๅผๆญฅ๏ผ็ฑปไผผไธ้ข ็บฟ็จๅๆนๆก็ๅๆณ.
5)-ๅตๅฅๅ็จๅฎ็ฐๅนถ่ก
suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {
return coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body()!!
val users = repos.flatMap { repo ->
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
val deferredUsers = repos.map { repo ->
async {
val contributors = service.getRepoContributors("carl", repo.name).also {
logUsers(repo, it)
}.body()!!
contributors
}
}
deferredUsers.awaitAll().flatten()
}
}- ไฝฟ็จ
async่ไธๆฏlaunch็ฑปไผผCallableๅฏไปฅๆฅๆถ่ฟๅๅผ Defered็ฑปไผผfutureๆ่promise
Tips
Deferredis a generic type that extendsJob. Anasynccall can returns aDeferred<Int>or aDeferred<CustomType>, depending on what the lambda returns (the last expression inside the lambda is the result).
2-Cancellation
1)-Whatโs cancel
่ฟๅ็ Job ๅฏน่ฑกๅฐฑ่ช็ถๆ Cancelled ็่ฝๅ.
้ฆๅ ่ฆ็่งฃไปไนๆ ท็ไธ่ฅฟๅฏไปฅๅๅฐ ๅๆถ. ๅๆถๆบๅถ็ๆฌ่ดจๆฏๅไฝ็, ๆขๅฅ่ฏ่ฏด๏ผ่ฆๆณๅๆถ๏ผ่ฆ้ ๅ็จไปฃ็ ๅปไธปๅจ็ๆฃๆฅๆฏๅฆ่ขซๅๆถ.
ไธ้ข็ไปฃ็ ไธไผไธปๅจๅๆถ
while (true) {
println("still working")
}ๅฏไปฅไฝฟ็จ isActive ๆ่ ensureActive ๆฅๆฏๆๅๆถ
fun main() {
runBlocking(Dispatchers.Default) {
val job = launch {
// repeat(1000) { i ->
// println("job: I'm sleeping $i ...")
// delay(500L)
// }
while (true) {
ensureActive()
println("still working")
}
}
// delay(1300L)
println("main: I'm tired of waiting!")
job.cancel()
job.join()
}
}delay ้่ฟๅผๅธธๆฅ่งฆๅๅไฝๅๆถ.
val job = launch(Dispatchers.Default) {
repeat(5) { i ->
try {
// print a message twice a second
println("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
// log the exception
println(e)
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")- ็ฑไบๅผๅธธ่ขซๅไบๅฏผ่ด ๆ ๆณๅๆถ
2)-Cancel ็2็งๅงฟๅฟ
- ไฝฟ็จไธไธช
suspend function, ๆฏๅฆ่ฏดyieldๅฐฑ้ๅธธๅ้ - ๅฎๆถๆฃๆฅ๏ผ็ฑปไผผไธ้ข็
isActive
3)-Finally
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")cancelAndJoin่ฟไธชๆนๆณ
4)-ๅฏไปฅไฝฟ็จ NonCancellable ๅๅปบไธๅฏๅๆถ็ไปฃ็ ๅ
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")5)-Timeout ๅ่ตๆบๆณ้ฒ
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
val resource = withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
Resource() // Acquire a resource and return it from withTimeout block
}
resource.close() // Release the resource
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}- ๆๅฏ่ฝ่ตๆบๆณ้ฒ๏ผๅบ่ฏฅ็จ try-finally ๅ ่ฃ ่ตทๆฅ