重学安卓之kotlin协程

导读

因为2021年开始做C++,之前的项目还没来得及怎么用kotlin,就直接被爸爸宣布死刑了。所以目前找工作等于需要再学一次kotlin,特别是协程,之前几乎没有了解。买了本书,再加上网上的各种文章,整理一点笔记出来。如果有和我差不多的,可以看一看

协程,协程可以理解为某种自由调度的任务,主要是对函数挂起,恢复。听起来像线程的唤醒,本质上和线程没什么关系。使用协程,我们可以以单线程的方式写多线程的代码,同时还能节省线程资源,并且能方便处理异常信息,返回结果等。我们给函数前面加一个suspend关键字,就变成一个挂起函数,可以在函数内操作协程的挂起和恢复。其中部分内容有重复,不必理会。主要参考资料<深入理解Kotlin协程>,算是一个简单的笔记整理。

协程的API

什么是挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun download(url: String) {
// ...
}
suspend fun bitmapSuspendable(url: String): Bitmap =
// 挂起协程 下载完成后,调用resume
suspendCoroutine<Bitmap> { continuation ->
thread {
try {
continuation.resume(download(url))
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}

}

suspend fun main() {
try {
var bitmap = bitmapSuspendable("url1")
} catch ( e: Exception) {
}
}

上面的代码使用suspendCoroutine函数包装download,协程结束后,使用resume返回bitmap结果。当然,Kotlin 协程提供了更方便的库函数来处理挂起操作,不需要我们来实现。

  • suspend关键字,被suspend关键字修饰的函数叫挂起函数。suspend函数只能被协程或者suspend函数调用。
  • suspendCoroutine 用于挂起协程的函数,返回值类型作为挂起函数的返回值,也就是泛型参数T的实参Bitmap,这个函数除了确实返回值类型外,还能帮我们拿到一个Continuation实例,负责保存和恢复挂起状态。
  • resume: 恢复协程 resumeWithException:将exception携带恢复协程
  • resumeWith: 用于恢复协程执行,并提供一个 Result 对象作为协程的结果,该对象可以包含成功的返回值或异常。
  • resumeWithException: 恢复一个协程,并传递一个异常。一般无需显式调用,因为协程库会自动处理协程的取消和异常传播。

系统提供的函数

1
2
3
4
5
6
7
8
9
10
11
12
suspend fun load(time: Int): String {
delay(1000)
return "result $time"
}

fun main = runBlocking {
val load1 = async { load(1) }
val load2 = async { load(2) }
// 等待获取结果
load1.await()
load2.await()
}

上述代码中,runBlocking用来创建一个阻塞的主协程,runBlocking中的代码会阻塞当前线程,直到所有协程都执行完成,这样可以不改变代码结构的情况下使用挂起函数。还有下面这些常用的函数。通常我们使用launchwithContext多一些,具体使用在后面有,这里就不放代码了。

  • async 启动异步协程,可以通过调用await来等待获取异步结果
  • launch: 启动一个新协程,新协程和当前协程并行运行。但是它不会等待新协程的完成,继续执行后续代码。该函数会返回一个Job对象,可以使用该对象来控制、取消、等待该协协程的执行。
  • withContext: 用于在指定的上下文中执行协程代码块,并等待代码块执行完成。与 launch 不同,withContext 挂起当前协程。常用于在协程中切换到指定的调度器上执行代码。
  • produce: 创建一个生产者协程,用于生成序列化的值。生产者协程可以使用 send 函数将值发送到管道中,并使用 receive 函数从管道中接收值。
  • actor: 创建一个带有状态的协程,用于执行并发操作并保持状态。actor 协程可以接收并处理发送到它的消息,允许在协程之间共享和交换数据。
  • supervisorScope: 创建一个作用域,其中的子协程的失败不会影响其他子协程。当一个子协程失败时,supervisorScope 中的其他子协程仍然可以继续执行。
  • repeat: 创建一个协程,重复执行指定的代码块,类似于循环。可以使用 delay 函数来控制重复执行的间隔。
  • select: 用于在多个挂起操作之间进行选择,类似于 select 语句。可以在一个 select 代码块中等待多个挂起函数的结果,当其中一个操作准备就绪时,select 将执行相应的代码块。

协程怎么创建的

1
2
3
4
5
6
7
8
val continuation = suspend {
println("In Coroutine")
5
}.createCoroutine(object: Continuation<Int>) {
override fun resumeWith(result: Result<Int>) {
println("Coroutine End: $result")
}
}

查看createCoroutine声明

1
2
3
fun <T> (suspend () -> T).createCoroutine(
completion:Continuation<T>
): Continuation<Unit>
  • suspend () -> T是被suspend修饰的挂起函数,可以称作是协程体。
  • completion是协程完成后的回调
  • 返回值是一个Continuation,用于触发协程的启动,其实就是包了几层马甲的协程体

作用域 scope,可以在协程体中直接调用作用域对象的函数。挂起函数加了RestrictsSuspension注解就不能调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun <R, T> (suspend R.() -> T).createCoroutine(
receiver R,
completion: Continuation<T>
): Continuation<Unit>

// 封装一个启动协程的函数
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T>) {
override fun resumeWith(result: Result<Int>) {
println("Coroutine End: $result")
}
}
}

// 启动带有Receiver的协程
class ProducerScope<T> {
suspend fun produce(value: T) { ... }
}
fun callLaunchCoroutine() {
launchCoroutine(ProducerScope<Init>()) {
produce(1024)
}
}

挂起函数就是普通函数的参数中多了一个Continuation实例。任何一个挂起函数或者协程体都有一个Continuation实例,所以挂起函数一定要运行在挂起函数或者其它协程体中。

协程scope的概念

CoroutineScope,CoroutineScope可以理解为协程的作用域,会跟踪它使用 launch 或 async 创建的所有协程。您可以随时调用 scope.cancel() 以取消正在进行的工作(即正在运行的协程)。在 Android 中,某些 KTX 库为某些生命周期类提供自己的 CoroutineScope。例如,ViewModel 有 viewModelScope,Lifecycle 有 lifecycleScope。不过,与调度程序不同,CoroutineScope 不运行协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ExampleClass {

// Job and Dispatcher are combined into a CoroutineContext which
// will be discussed shortly
val scope = CoroutineScope(Job() + Dispatchers.Main)

fun exampleMethod() {
// Starts a new coroutine within the scope
scope.launch {
// New coroutine that can call suspend functions
fetchDocs()
}
}

fun cleanUp() {
// Cancel the scope to cancel ongoing coroutines work
scope.cancel()
}
}

使用launch 或 async 创建的协程会返回一个Job。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ExampleClass {
...
fun exampleMethod() {
// Handle to the coroutine, you can control its lifecycle
val job = scope.launch {
// New coroutine
}

if (...) {
// Cancel the coroutine started above, this doesn't affect the scope
// this coroutine was launched in
job.cancel()
}
}
}

Job有一个函数叫join(),它会阻塞当前协程,直到目标协程执行完成为止。

Android中的协程

常用启动协程的方式

  • launch 启动新协和而不将结果返回调用方。
  • async 启动新协程,并允许使用一个名为await的挂起函数返回结果。
  • withContext 挂起当前协程,本身就是一个挂起函数,作用等价于async{}.await()

通常应该用launch,只有另一个协程内时,或在挂起函数内且正在执行并行分解时,才使用 async。在suspend函数启动的所有协程,都必须在函数返回结果前停止。可以使用await()/awaitAll()保证协程在函数返回结果前完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun fetchTwoDocs() =
coroutineScope {
val deferredOne = async { fetchDoc(1) }
val deferredTwo = async { fetchDoc(2) }
deferredOne.await()
deferredTwo.await()
}

suspend fun fetchTwoDocs() = // called on any Dispatcher (any thread, possibly Main)
coroutineScope {
val deferreds = listOf( // fetch two docs at the same time
async { fetchDoc(1) }, // async returns a result for the first doc
async { fetchDoc(2) } // async returns a result for the second doc
)
deferreds.awaitAll() // use awaitAll to wait for both network requests
}

调度器

  1. Dispatchers.Default: 用于执行 CPU 密集型的计算任务,比如对数据进行处理、转换等。它使用共享的线程池,并且适用于不会阻塞线程的操作。
  2. Dispatchers.Main: 用于在主线程中执行操作,比如更新 UI、处理用户交互等。在 Android 中,它与主线程关联,因此适用于 UI 操作。
  3. Dispatchers.Unconfined: 不受限制的调度器,它会在协程恢复执行时,继续使用调用者线程。但是,一旦恢复执行的代码开始执行挂起操作,调度器会切换到其他线程。
    1
    2
    3
    4
    5
    button.setOnclickListener {
    GlobalScope.launch(Dispatchers.Main) {
    // ..
    }
    }
    另外还有一个MainScop()函数,注意要在关闭的时候调用cancel
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //activity
    private val mainScope by lazy { MainScop() }

    button.setOnClickListener {
    mainScope.launch {
    // ui线程
    }
    }

    // onDestory 注意要调用cancel
    mainScope.cancel()

  • lifecycleScope.launch {}
  • viewModelScope.launch {}

常见函数

  • withTimeout(1000) : 超时取消

实用技巧

1
2
3
4
5
6
7
8
// 
suspend fun a() {

}
scope.launch(Dispatchers.IO) {
//...
a()
}

上方的代码存在的问题是,因为要想调用a(),每次必须显式在子线程中调用。可以将a改进,使用withContext封装

1
2
3
4
5
6
// 这种方式可以确保a的调用者不用关注在哪个线程中执行
supsend fun a() : Result {
return withContext(Dispatchers.IO) {
//...
}
}

改进后,要想调用a,必须使用协程。这种方式可以确保a的调用者不用关注在哪个线程中执行。

Channel & Flow

Channel (热流)

Channel相当于生产者消费者,先生产再消费,在协程中间建立一个缓存区。produce函数可以构建一个生产者协程,也可以在协程中只创建Channel。推荐用produce channel,不要用actor(obsolete)

1
2
3
4
5
6
7
8
9
10
11
// 相关代码
val channel = produce {
//...
send(it)
}
val channel = actor<Int> {
//...
val element = receiver()
}
channel.send()
channel.receive()

Channel的内部实现

1
2
3
4
5
6
7
8
9
10
11
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capcity) {
// 容量为1,send后,不调用receiver就一直挂起
RENDEZVOUS -> RendezvousChannel()
// 容量无限
UNLIMITED -> LinkedListChannel()
// 只保留最后一个
CONFLATED -> ConflatedChannel()
// 指定大小
else -> ArrayChannel(capacity)
}

Channel是需要关闭的,produce方法会在协程结束后自去关闭对应的_channel,所以不用担心。需要注意的是,一旦调用close,它的isClosoedForSend方法会立即返回true,但是因为里面还有元素,isClosedForReceiver要等全处理完才返回true

BroadcastChannel 顾名思义,广播所以receiver都会收到。需要注意,必须有一个订阅者,不然数据会被丢弃。

1
2
3
4
val broadcastChannel = BroadcastChannel<Int>(5)
val receiverChannel = broadcastChannel.openSubscription()
receiverChannle.receiver()
receiverChannle.cancel()

普通Channel也可以通过调用broadcast()函数来转换成broadcast

Flow (冷流)

flow 不消费不生产,不会立即执行flow的内容,会在collect的时候,才去执行。消费者协程cancel()后,flow会跟着结束(没有缓冲队列,区别channel)

flow的末端函数都是suspend函数

1
2
3
4
5
6
7
8
9
10
11
12
val flow = flow {
val data = "DATA"
emit(data)
}

launch {
flow.collect { data ->
//...
cancel()
}
}

不同的消费者协程不共享flow,如下列代码,flow中的代码会被多次执行。(每个消费者拥有独立的生产者)

1
2
3
4
5
6
7
repeat(2) {ci ->
launch {
flow.collect { data ->
//...
}
}
}
  • 如果在不同线程操作,可以使用flow.flowOn(Dispatchers.Main)进行上下文切换。
  • 如果要在Flow完成时执行逻辑,可以使用onCompletion,相当于try catch的finally

** 背压问题 ** 生产者生产太快,消费者跟不上。解决:

  1. 调用flow.buffer() 加入缓存
  2. 调用flow.conflate() 新数据替换旧数据
  3. 调用使用flow.conllectLatest() 和 conllect的区别是,它不会直接使用新数据覆盖老数据,会依序处理,但如果前一个没处理完的话会被取消。

channelFlow

1
2
3
4
5
6
channelFlow,使flow具体channel的特性
val flow = channelFlow {
withContext(Dispatcher.IO) {
send("DATA")
}
}

callbackFlow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val result = trySend("DATA")
result.onSuccess {

}.onFailure {

}.onClosed {

}
协程cancel后,会调用awaitClose
awaitClose {
//...
}

launch {
flow.collect {
//...
cancel()
}
}

冷流&热流

项目 冷流 热流
生产时间机 一旦创建立即生产 需要时生产
多个消费者 消费者按顺序获取 每个消费者有独立的生产线路
生命周期 生产者消费者无关系 生产者消费者生命周期一致

SharedFlow & StateFlow 热流

flow.ShareIn()方法,传递3个参数:scope协程域
flow.StateIn()

1
2
3
4
5
6
flow {...}.stateIn(scope)
//类似
flow {...}.shareIn(scope,
stared = SharingStarted.Eagerly,
replay = 1
)

stateFlow要和repeatOnLifecycle一起使用防止热流在view的无效生命周期更新

协程并发问题

Mutex

和线程锁类似,不过不会阻塞线程,只是挂起等待锁的释放

1
2
3
4
5
6
7
8
9
var count = 0
val mutex = mutex()
List(1000) {
GlobalScope.launch {
mutext.withLock {
count ++
}
}
}.joinAll()

Semaphore

信号量,信号量可以有多个,当参数为1时,效果等价于Mutex

1
2
3
4
5
6
7
8
val semaphore = Semaphore(1)
List(1000) {
GlobalScopre.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()

注意

要注意避免访问外部状态,只能基于参数做运算,再通过返回值提供结果。

1
2
3
4
5
6
val count = 0
val result = count + List(1000) {
GlobalScope.async {1}
}.map {
it.await()
}.sum()

GlobalScope.async 创建了一个由 1000 个协程组成的集合,每个协程都会返回整数值 1。然后通过 map 函数等待所有协程的执行结果,并将结果求和。最终的结果会赋值给变量 result。