refer

1-quick start

1-1 hello world

1)-ๅฟซ้€Ÿ็š„ Hello World

dependencies {  
//    runtimeOnly("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")  
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")  
}
import kotlinx.coroutines.*  
  
fun main() = runBlocking { // this: CoroutineScope  
    launch { // launch a new coroutine and continue  
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)  
        println("World!") // print after delay  
    }  
    println("Hello") // main coroutine continues while a previous one is delayed  
}
  • runBlocking ๅฏๅŠจไบ†ไธ€ไธช ไธŠไธ‹ๆ–‡. ๅชๆœ‰่ฟ™ไธช Scope ๅ†…้ƒจๆ‰่ƒฝๅฏๅŠจๅ็จ‹
  • launch ๅˆ™ๆ˜ฏๅฏๅŠจๅ็จ‹็š„ๆ–นๆณ•
  • delay : suspend ๆ–นๆณ•, ๆŠŠๅ็จ‹ๆŒ‚่ตท,็ญ‰ๅพ…ๅ”ค้†’, ๅฎŒๅ…จไธไผš block ๅบ•ๅฑ‚็š„ thread , ่ฟ™ไธช thread ไผš่ขซๅ…ถไป–็š„ๅ็จ‹่ฐƒๅบฆไฝฟ็”จ.

1-2 structured concurrency

1)-Demo

ๅ็จ‹ไธ€ๅฎš่ฆๅœจ CoroutineScope ไธญๆ‰่ƒฝๅฏๅŠจ. ่ฟ™ๆ˜ฏๅŸบไบŽ golang ็š„ routine ็š„ไผ˜็ผบ็‚นๅๅค่ฎจ่ฎบไน‹ๅŽ็š„ ๅ็จ‹่Œƒๅผ, ็ป“ๆž„ๅŒ–ๅนถๅ‘.

  1. ๅ็จ‹็š„็”Ÿๅ‘ฝๅ‘จๆœŸๅœจ Scope ไธญไผš่ขซๆ˜Ž็กฎ็š„ๅฎšไน‰ ;
  2. ๅ็จ‹ๅฏไปฅๆœ‰ ็ˆถๅญๅ…ณ็ณป, ็ˆถๅ็จ‹ไธไผšๅœจ ๅญๅ็จ‹ๅฎŒๆˆไน‹ๅ‰็ป“ๆŸ ;
sequenceDiagram
    participant P as Parent Scope
    participant C1 as Child Coroutine 1
    participant C2 as Child Coroutine 2

    P->>C1: Launch
    P->>C2: Launch
    C1-->>C1: Execute
    C2-->>C2: Execute
    C1-->>P: Complete
    C2-->>P: Complete
    P-->>P: All children complete
    P-->>P: Scope completes
fun main() = runBlocking {  
    try {  
        coroutineScope { // ๅˆ›ๅปบไธ€ไธชๅ็จ‹ไฝœ็”จๅŸŸ  
            launch {  
                delay(500L)  
                println("Task 1 ๅฎŒๆˆ")  
            }  
            launch {  
                delay(300L)  
                println("Task 2 ๅฎŒๆˆ")  
                throw RuntimeException("Task 2 ๅ‡บ้”™")  
            }  
            launch {  
                delay(400L)  
                println("Task 3 ๅฎŒๆˆ")  
            }  
            println("็ญ‰ๅพ…ๆ‰€ๆœ‰ไปปๅŠกๅฎŒๆˆ")  
        }  
    } catch (e: Exception) {  
        println("ๆ•่Žทๅˆฐๅผ‚ๅธธ: ${e.message}")  
    }  
    println("ๆ‰€ๆœ‰ไปปๅŠกๅทฒๅฎŒๆˆๆˆ–่ขซๅ–ๆถˆ")  
}

2)-History

็ป“ๆž„ๅŒ–ๅนถๅ‘ๆจกๅผ ็š„ๆฆ‚ๅฟตๆ˜ฏ 2018 ๅนด็”ฑ Nathaniel J Smith ๆๅ‡บ. ็†่ฎบๅŸบ็ก€:

  1. ๆŠŠๅนถๅ‘็จ‹ๅบ็š„ๆ‰ง่กŒๆต็จ‹ ็ป„็ป‡ไธบไธ€ไธช ๆ ‘็Šถ็ป“ๆž„๏ผŒ ่ฎฉ็จ‹ๅบ็š„ๆ‰ง่กŒ่ทฏๅพ„ๆ›ดๅŠ ็š„ๆธ…ๆ™ฐๅ’Œๅฏไปฅ้ข„ๆต‹;
  2. ่งฃๅ†ณ่ต„ๆบๆณ„้œฒ้—ฎ้ข˜: ้€š่ฟ‡ๆŠŠๅนถๅ‘ๆ“ไฝœ้™ๅˆถๅœจไธ€ไธช็‰นๆฎŠ็š„ ไฝœ็”จๅŸŸไธญ๏ผŒ็กฎไฟๆ‰€ๆœ‰็š„ๅญไปปๅŠก ๅœจไฝœ็”จๅŸŸ็ป“ๆŸ็š„ๆ—ถๅ€™ไผš่ขซ่‡ชๅŠจๆธ…็† ;
  3. ่งฃๅ†ณ้”™่ฏฏ้—ฎ้ข˜็š„ๅคๆ‚ๆ€ง้—ฎ้ข˜: ๆไพ›ไบ† ไธ€็ง็ปŸไธ€็š„ ้”™่ฏฏไผ ๆ’ญๆœบๅˆถ, ่ฎฉ้”™่ฏฏๅฏไปฅ ๆ ‘็Šถๅ…ณ็ณปๅ‘ไธŠไผ ๆ’ญ ;
  4. ๆŠŠไปปๅŠก็š„ ็”Ÿๅ‘ฝๅ‘จๆœŸๅ’Œๅˆ›ๅปบๅฎƒ็š„ไฝœ็”จๅŸŸ็ป‘ๅฎš๏ผŒ็ฎ€ๅŒ–็ฎก็†

3) scope

  • The coroutine scope is responsible for the structure and parent-child relationships between different coroutines. New coroutines usually

1-3 Example

1)-่Žทๅ– repos ็š„ Contributors

interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    fun getOrgReposCall(
        @Path("org") org: String
    ): Call<List<GithubRepo>>
 
    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    fun getRepoContributorsCall(
        @Path("owner") owner: String,
        @Path("repo") repo: String
    ): Call<List<GithubUser>>
}
fun loadContributorsBlocking(
    service: GitHubService,
    req: RequestData
): List<User> {
    val repos = service
        .getOrgReposCall(req.org)   // #1. ๅ‘่ตท API, ่Žทๅ–ๆ‰€ๆœ‰็š„ไป“ๅบ“
        .execute()                  // #2. ๅŒๆญฅๆ‰ง่กŒ
        .also { logRepos(req, it) } // #3. ่ฎฐๅฝ•ๆ—ฅๅฟ—
        .body() ?: emptyList()      // #4. ่Žทๅ–ๅ“ๅบ”ไฝ“
 
    return repos.flatMap { repo ->
        service
            .getRepoContributorsCall(req.org, repo.name) // #1. ๅฏนๆฏไธชไป“ๅบ“๏ผŒ่Žทๅ– Contributors
            .execute()                                   // #2
            .also { logUsers(repo, it) }                 // #3
            .bodyList()                                  // #4
    }.aggregate()
}

็ป“ๆžœๅฆ‚ไธ‹:

1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors ...
when (getSelectedVariant()) {
    BLOCKING -> { // Blocking UI thread
        val users = loadContributorsBlocking(service, req)
        updateResults(users, startTime)
    }
}

2)-ๅผ‚ๆญฅๅŒ–็š„ๆ–นๅผ

fun loadContributorsBackground(
    service: GitHubService,
    req: RequestData,
    updateResults: (List<User>) -> Unit
) {
    thread {
        loadContributorsBlocking(service, req)
    }
}

3)-callback ็š„ๆ–นๅผ

fun loadContributorsCallbacks(
    service: GitHubService, req: RequestData,
    updateResults: (List<User>) -> Unit
) {
    service.getOrgReposCall(req.org).onResponse { responseRepos ->  // #1
        logRepos(req, responseRepos)
        val repos = responseRepos.bodyList()
 
        val allUsers = mutableListOf<User>()
        for (repo in repos) {
            service.getRepoContributorsCall(req.org, repo.name)
                .onResponse { responseUsers ->  // #2
                    logUsers(repo, responseUsers)
                    val users = responseUsers.bodyList()
                    allUsers += users
                }
            }
        }
        // TODO: Why doesn't this code work? How to fix that?
        updateResults(allUsers.aggregate())
    }
  • ่ฟ™้‡Œ็š„ onResponse ๆ˜ฏไธ€ไธชๅผ‚ๆญฅๅŒ–็š„ callback. ไฝ†ๆ˜ฏ updateResults ไผšๆœ‰้—ฎ้ข˜ . **ๆญฃ็กฎ็š„ๆ—ถๆœบๆ˜ฏๅœจๆ‰€ๆœ‰็š„ๅญไปปๅŠก้ƒฝ็ป“ๆŸๅŽ๏ผŒ่ฐƒ็”จ updateResults **
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name)
        .onResponse { responseUsers ->
            // processing repository
            countDownLatch.countDown()
        }
}
countDownLatch.await()
updateResults(allUsers.aggregate())
  • ไธ€็งๅงฟๅŠฟไฝฟ็”จ countDownLatch ๆฅ่งฃๅ†ณ้—ฎ้ข˜.

4)-ไฝฟ็”จ suspend ๆฅ่งฃๅ†ณ้—ฎ้ข˜

interface GithubApiService {  
    @GET("mock/github/orgs/{org}/repos")  
    suspend fun getOrgRepos(@Path("org") org: String): Response<List<GithubRepo>>  
  
    @GET("mock/github/repos/{owner}/{repo}/contributors")  
    suspend fun getRepoContributors(  
        @Path("owner") owner: String, @Path("repo") repo: String  
    ): Response<List<GithubUser>>  
}
  • ่ฟ”ๅ›žๅ€ผๅ˜ไธบ Response ่ฟ™ไธชๅบ”่ฏฅๆ˜ฏๅŒๆญฅ็š„, ไน‹ๅ‰็š„ Call
suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {  
    val repos = service  
        .getOrgRepos(req.org)  
        .also { logRepos(req, it) }  
        .body()!!  
  
  
    val users = repos.flatMap { repo ->  
        val contributors = service.getRepoContributors("carl", repo.name).also {  
            logUsers(repo, it)  
        }.body()!!  
  
        contributors  
    }  
  
    return users  
}

Tips

ๅ็จ‹็š„ ๅฎž็Žฐไพ่ต–ไบŽ ๅ็จ‹ๅบ“. ๅฆ‚ๆžœ่ฟ˜ๆ˜ฏ็บฟ็จ‹้˜ปๅกž็š„่ฐƒ็”จ๏ผŒไพๆ—งไผš้˜ปๅกž็บฟ็จ‹

  • ๆญคๆ—ถ๏ผŒๅฝ“ๅ‰้€ป่พ‘ๅ˜ไธบไบ† ๅผ‚ๆญฅ๏ผŒ็ฑปไผผไธŠ้ข ็บฟ็จ‹ๅŒ–ๆ–นๆกˆ็š„ๅšๆณ•.

5)-ๅตŒๅฅ—ๅ็จ‹ๅฎž็Žฐๅนถ่กŒ

suspend fun loadContributorsSuspend(req: RequestData): List<GithubUser> {  
    return coroutineScope {  
        val repos = service  
            .getOrgRepos(req.org)  
            .also { logRepos(req, it) }  
            .body()!!  
  
        val users = repos.flatMap { repo ->  
            val contributors = service.getRepoContributors("carl", repo.name).also {  
                logUsers(repo, it)  
            }.body()!!  
  
            contributors  
        }  
  
        val deferredUsers = repos.map { repo ->  
            async {  
                val contributors = service.getRepoContributors("carl", repo.name).also {  
                    logUsers(repo, it)  
                }.body()!!  
  
  
                contributors  
            }  
        }  
  
        deferredUsers.awaitAll().flatten()  
    }  
}
  • ไฝฟ็”จ async ่€Œไธๆ˜ฏ launch ็ฑปไผผ Callable ๅฏไปฅๆŽฅๆ”ถ่ฟ”ๅ›žๅ€ผ
  • Defered ็ฑปไผผ future ๆˆ–่€… promise

Tips

Deferred is a generic type that extends Job. An async call can returns a Deferred<Int> or a Deferred<CustomType>, depending on what the lambda returns (the last expression inside the lambda is the result).

2-Cancellation

1)-Whatโ€™s cancel

่ฟ”ๅ›ž็š„ Job ๅฏน่ฑกๅฐฑ่‡ช็„ถๆœ‰ Cancelled ็š„่ƒฝๅŠ›.

้ฆ–ๅ…ˆ่ฆ็†่งฃไป€ไนˆๆ ท็š„ไธœ่ฅฟๅฏไปฅๅšๅˆฐ ๅ–ๆถˆ. ๅ–ๆถˆๆœบๅˆถ็š„ๆœฌ่ดจๆ˜ฏๅไฝœ็š„, ๆขๅฅ่ฏ่ฏด๏ผŒ่ฆๆƒณๅ–ๆถˆ๏ผŒ่ฆ้  ๅ็จ‹ไปฃ็ ๅŽปไธปๅŠจ็š„ๆฃ€ๆŸฅๆ˜ฏๅฆ่ขซๅ–ๆถˆ.

ไธ‹้ข็š„ไปฃ็ ไธไผšไธปๅŠจๅ–ๆถˆ

while (true) {  
    println("still working")  
}

ๅฏไปฅไฝฟ็”จ isActive ๆˆ–่€… ensureActive ๆฅๆ”ฏๆŒๅ–ๆถˆ

fun main() {  
    runBlocking(Dispatchers.Default) {  
  
        val job = launch {  
//            repeat(1000) { i ->  
//                println("job: I'm sleeping $i ...")  
//                delay(500L)  
//            }  
  
            while (true) {  
                ensureActive()  
                println("still working")  
            }  
        }  
  
  
//        delay(1300L)  
        println("main: I'm tired of waiting!")  
        job.cancel()  
        job.join()  
    }  
}

delay ้€š่ฟ‡ๅผ‚ๅธธๆฅ่งฆๅ‘ๅไฝœๅ–ๆถˆ.

val job = launch(Dispatchers.Default) {
    repeat(5) { i ->
        try {
            // print a message twice a second
            println("job: I'm sleeping $i ...")
            delay(500)
        } catch (e: Exception) {
            // log the exception
            println(e)
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
  • ็”ฑไบŽๅผ‚ๅธธ่ขซๅƒไบ†ๅฏผ่‡ด ๆ— ๆณ•ๅ–ๆถˆ

2)-Cancel ็š„2็งๅงฟๅŠฟ

  1. ไฝฟ็”จไธ€ไธช suspend function, ๆฏ”ๅฆ‚่ฏด yield ๅฐฑ้žๅธธๅˆ้€‚
  2. ๅฎšๆ—ถๆฃ€ๆŸฅ๏ผŒ็ฑปไผผไธŠ้ข็š„ isActive

3)-Finally

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
  • cancelAndJoin ่ฟ™ไธชๆ–นๆณ•

4)-ๅฏไปฅไฝฟ็”จ NonCancellable ๅˆ›ๅปบไธๅฏๅ–ๆถˆ็š„ไปฃ็ ๅ—

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

5)-Timeout ๅ’Œ่ต„ๆบๆณ„้œฒ

var acquired = 0
 
class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}
 
fun main() {
    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block     
                }
                resource.close() // Release the resource
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}
  • ๆœ‰ๅฏ่ƒฝ่ต„ๆบๆณ„้œฒ๏ผŒๅบ”่ฏฅ็”จ try-finally ๅŒ…่ฃ…่ตทๆฅ