1. 首页

深入浅出 Kotlin 协程

本文要点

  • JVM 并没有提供对协程的原生支持
  • Kotlin 在编译器中实现协程是通过将其转换为一个状态机实现的
  • Kotlin 为实现使用了一个关键字,其余都是通过库来完成的
  • Kotlin 使用连续传递风格(Continuation Passing Style,CPS)来实现协程
  • 协程会使用到 Dispatcher,所以在 JavaFX、Android、Swing 等场景下,用法略有差异。

尽管协程并不是一个新的话题,但它是一个很吸引人的话题。正如其他地方的文档所述,协程在这些年里被重新发现了很多次,特别是当需要某种形式的轻量级线程和 / 或寻找“回调地狱”的解决方案时。

最近,协程已经成为 JVM 上反应式编程的一个流行的替代方案。像 RxJavaReactor 项目这样的框架为客户端提供了一种渐进式处理传入信息的方式,并对节流和并行提供了广泛的支持。但是,我们必须围绕反应流的函数式操作来重构代码,在许多情况下,这么做的成本大于收益

举例来讲,这是 Android 社区需要更简单替代方案的原因。Kotlin 语言引入协程作为实验性的特性以满足该需求,在一些改进之后,它们成为该语言 1.3 版本的官方特性。Kotlin 协程的应用已经从 UI 开发扩展到服务器端框架 (如 Spring 5 中添加的支持),甚至包括像 Arrow(通过 Arrow Fx ) 这样的函数式框架。

理解协程的挑战

令人遗憾的是,理解协程并不是一件容易的任务。尽管有许多 Kotlin 专家所做的关于协程的演讲,其中很多都是启发性的并且包含非常多的信息,但是想要简单地知道协程是什么(或者它该怎样使用)并不容易。你可能会说协程就是并行编程的等价品。

导致该问题的部分原因在于底层实现。在 Kotlin 协程中,编译器只实现了一个suspend关键字,其他的事情都是由协程库处理的。因此,Kotlin 协程非常强大和灵活,但在结构上却不那么固定。对于初学者来说,这是一个学习的障碍,因为他们学习的最好方法是遵循坚实的指导方针和严格的原则。本文会从底层开始介绍协程,希望能够为读者提供这样的基础知识。

我们的示例应用(服务器端)

我们的应用程序将建立在安全有效地对 RESTful 服务进行多次调用的规范性(canonical)问题之上。我们将播放 Where’s Waldo 的一个文本版本——在这个版本中,用户遵循一系列的名称,直到他们找到“Waldo”。

下面是完整的 RESTful 服务,它是使用 Http4k 编写的。Http4k 是由 Marius Eriksen 撰写的著名论文中描述的函数式服务器架构的Kotlin 版本。该实现存在许多其他语言的版本,包括Scala( Http4s )和 Java 8 或更高版本( Http4j )。

这里只有一个端点,它通过 Map 实现了一个名字的链。给定一个名字,我们要么以 200 状态码返回匹配的值,要么以 404 状态码返回一条错误信息。

fun main() {
   val names = mapOf(
       "Jane" to "Dave",
       "Dave" to "Mary",
       "Mary" to "Pete",
       "Pete" to "Lucy",
       "Lucy" to "Waldo"
   )
   val lookupName = { request: Request ->
       val name = request.path("name")
       val headers = listOf("Content-Type" to "text/plain")
       val result = names[name]
       if (result != null) {
           Response(OK)
               .headers(headers)
               .body(result)
       } else {
           Response(NOT_FOUND)
               .headers(headers)
               .body("No match for $name")
       }
   }
   routes(
       "/wheresWaldo" bind routes(
           "/{name:.*}" bind Method.GET to lookupName
       )
   ).asServer(Netty(8080))
       .start()
}


实际上,我们希望客户端发送如下的请求链:

深入浅出Kotlin协程

我们的示例应用(客户端)

我们的客户端应用将会基于 JavaFX 库来创建用户界面。但是,为了简化我们的任务并避免不必要的细节,我们将会使用 TornadoFX ,它在 JavaFX 之上提供了一个 Kotlin DSL。

如下是客户端视角的完整定义:

class HelloWorldView: View("Coroutines Client UI") {
   private val finder: HttpWaldoFinder by inject()
   private val inputText = SimpleStringProperty("Jane")
   private val resultText = SimpleStringProperty("")
   override val root = form {
       fieldset("Lets Find Waldo") {
           field("First Name:") {
               textfield().bind(inputText)
               button("Search") {
                   action {
                       println("Running event handler".addThreadId())
                       searchForWaldo()
                   }
               }
           }
           field("Result:") {
               label(resultText)
           }
       }
   }
   private fun searchForWaldo() {
       GlobalScope.launch(Dispatchers.Main) {
           println("Doing Coroutines".addThreadId())
           val input = inputText.value
           val output = finder.wheresWaldo(input)
           resultText.value = output
       }
   }
}
class HelloWorldView: View("Coroutines Client UI") {   private val finder: HttpWaldoFinder by inject()   private val inputText = SimpleStringProperty("Jane")   private val resultText = SimpleStringProperty("")   override val root = form {       fieldset("Lets Find Waldo") {           field("First Name:") {               textfield().bind(inputText)               button("Search") {                   action {                       println("Running event handler".addThreadId())                       searchForWaldo()                   }               }           }           field("Result:") {               label(resultText)           }       }   }   private fun searchForWaldo() {       GlobalScope.launch(Dispatchers.Main) {           println("Doing Coroutines".addThreadId())           val input = inputText.value           val output = finder.wheresWaldo(input)           resultText.value = output       }   }}


我们还会使用如下的辅助函数作为 String 类型的扩展:


fun String.addThreadId() = "$this on thread ${Thread.currentThread().id}"

在运行的时候,UI 如下所示:

深入浅出Kotlin协程

当用户点击按钮的时候,我们会创建一个新的协程,并通过“HttpWaldoFinder”类型的服务对象访问 RESTful 端点。

Kotlin 协程存在于一个“CoroutineScope”中,而后者又会反过来和某种 Dispatcher 关联,Dispatcher 代表了底层的并发模型。并发模型通常是一个线程池,但是会有所差异。

具体哪些 Dispatcher 可用取决于 Kotlin 代码运行的环境。Main Dispatcher 表示 UI 库的事件处理线程,因此 (在 JVM 上) 只能在 Android、JavaFX 和 Swing 中使用。最初,Kotlin Native 根本不支持协程的多线程处理,但这种情况正在改变。在服务器端,我们可以自己引入协程,但是在越来越多的情况中,协程默认就是可用的,比如在 Spring 5 中。

在调用挂起(suspending)方法之前,我们必须将协程、“CoroutineScope”和“Dispatche”准备就绪。如果这是初始调用的话(如上面的代码所示),我们可以通过“协程构造者”函数,如“launch”和“async”,启动该过程。

不管是调用协程构建者函数,还是像“withContext”这样的作用域函数,都会创建一个新的“CoroutineScope”。在该作用域中,任务体现为“Job”实例的层级结构。

它们有一些很有意思的属性,即:

  • Job 会等待其块内所有协程完成后才能完成自己。
  • 取消一个 Job 会导致其所有的子 Job 都被取消。
  • 子 Job 的失败或取消会传播至父 Job。

这样设计的目的是避免并发编程中的常见问题,比如杀死父 Job 的时候没有终结其子 Job。

访问 REST 端点的服务

如下是 HttpWaldoFinder 服务的完整代码:

class HttpWaldoFinder : Controller(), WaldoFinder {
   override suspend fun wheresWaldo(starterName: String): String {
       val firstName = fetchNewName(starterName)
       println("Found $firstName name".addThreadId())
       val secondName = fetchNewName(firstName)
       println("Found $secondName name".addThreadId())
       val thirdName = fetchNewName(secondName)
       println("Found $thirdName name".addThreadId())
       val fourthName = fetchNewName(thirdName)
       println("Found $fourthName name".addThreadId())
       return fetchNewName(fourthName)
   }
   private suspend fun fetchNewName(inputName: String): String {
       val url = URI("http://localhost:8080/wheresWaldo/$inputName")
       val client = HttpClient.newBuilder().build()
       val handler = HttpResponse.BodyHandlers.ofString()
       val request = HttpRequest.newBuilder().uri(url).build()
       return withContext<String>(Dispatchers.IO) {
           println("Sending HTTP Request for $inputName".addThreadId())
           client
               .send(request, handler)
               .body()
       }
   }
}


“fetchNewName”函数会接收一个已知的名字,并查询端点获取关联的名字。这是通过使用“HttpClient”类型来实现的,该类型是从 Java 11 开始作为标准的。实际的 HTTP GET 会在一个新的子协程中运行,该子协程会使用 IO Dispatcher。它的表现形式是一个为长时间运行的活动(如网络调用)优化的线程池。

“wheresWaldo”函数会根据名字链执行五次,以便于(尽力)找到 Waldo。我们随后将会分解生成的字节码,所以我们让实现尽可能地简单。我们感兴趣的是,每次调用“fetchNewName”都会导致当前协程在子协程运行时被挂起。在这个特殊场景下,父 Job 运行在 Main Dispatcher 上,而子 Job 运行在 IO Dispatcher 上。因此,当子 Job 执行 HTTP 请求时,UI 事件处理线程会被释放出来处理与视图的用户交互。如下图所示。

深入浅出Kotlin协程

IntelliJ 会为我们展示何时发起挂起调用,因此会在协程间转换控制。需要注意,如果我们没有切换 Dispatcher 的话,那发起调用不一定会创建新的协程。当一个挂起函数调用另一个挂起函数时,可能会在相同的协程中继续,如果我们真的希望保持在同一个线程上,那么这的确就是我们想要的行为。

深入浅出Kotlin协程

当我们执行客户端的时候,如下就是控制台的输出:

深入浅出Kotlin协程

我们可以看到,在这个特殊的场景下,Main Dispatcher/UI 事件处理器运行在 17 号线程上,而 IO Dispatcher 运行在线程池上,包括 24 号和 26 号线程。

开始我们的调查

借助 IntelliJ 自带的字节码分解工具,我们可以看出到底发生了什么。另外,我们也可以使用 JDK 提供的标准“javap”工具。

深入浅出Kotlin协程

我们可以看到“HttpWaldoFinder”的方法改变了其签名,所以它们可以接受一个 Continuation 对象作为其额外的参数,并且返回某种通用的对象。

public final class HttpWaldoFinder extends Controller implements WaldoFinder {
  public Object wheresWaldo(String a, Continuation b)
  final synthetic Object fetchNewName(String a, Continuation b)
}

现在,我们深入代码看一下这些方法都添加了些什么,并阐述“Continuation”是什么以及现在返回的是什么。

连续传递风格(Continuation Passing Style,CPS)

按照 Kotlin 标准化过程对协程提议的文档描述,协程的实现是基于连续传递风格 (Continuation Passing Style,CPS) 的。会有一个 continuation 对象来存储函数在挂起阶段所需的状态。

在本质上,挂起函数的每个局部变量都会成为 continuation 的一个字段。另外,还需要创建字段来存储所有的参数和当前对象(如果函数是方法的话)。所以,一个挂起方法如果有四个参数和五个局部变量的话,continuation 至少要有 10 个字段。

在“HttpWaldoFinder”的“wheresWaldo”方法中,有一个参数和四个本地变量,所以我们预期 continuation 实现类型会有六个字段。如果我们将 Kotlin 编译器生成的字节码分解成 Java 源码的话,就会发现事实确实如此:

$continuation = new ContinuationImpl($completion) {
  Object result;
  int label;
  Object L$0;
  Object L$1;
  Object L$2;
  Object L$3;
  Object L$4;
  Object L$5;
  @Nullable
  public final Object invokeSuspend(@NotNull Object $result) {
     this.result = $result;
     this.label |= Integer.MIN_VALUE;
     return HttpWaldoFinder.this.wheresWaldo((String)null, this);
  }
};

鉴于所有的字段都是 Object 类型的,所以它们该如何使用并不明显。随着进一步探索,我们将会看到:

  • “L$0”持有对“HttpWaldoFinder”实例的引用。它始终都会存在。
  • “L$1”持有“starterName”参数的值。它始终都会存在。
  • “L2”到“L2”到“L2”到“L5”持有本地变量的值。随着代码的执行,它们将会渐进式地填充进来。“L$2”会持有“firstName”的值,以此类推。

我们还有额外的字段存储最终结果,有一个名为“label”的有趣的整型字段。

挂起还是不挂起——这是一个问题

当我们检查生成的代码时,需要记住它要处理两个场景。每当一个挂起的函数调用另一个函数时,它可能会挂起当前的协程(这样另外一个函数可以在相同的线程上运行),也可能在当前协程会继续执行。

我们考虑一个从数据存储中读取值的挂起函数。当 I/O 发生的时候,它很可能会被挂起,但是它也可能会缓存结果。后续调用可以同步返回缓存的值,不需要任何的挂起。Kotlin 编译器所生成的代码必须要同时支持这两种路径。

Kotlin 会调整每个挂起函数的返回类型,这样的话,它要么返回真正的结果,要么返回特殊的值 COROUTINE_SUSPENDED。如果是后者的话,当前的协程会被挂起。这也是为什么挂起函数的返回类型从结果类型变成了“Object”。

在我们的样例应用中,“wheresWaldo”会重复调用“fetchNewName”。在理论上,每次这样的调用都可能挂起或不挂起当前的协程。按照我们编写“fetchNewName”的方式,可以知道,挂起始终都会发生。但是,为了理解生成的代码,我们必须记住它需要处理所有的可能性。

大的 Switch 语句和 Label

如果我们进一步查看分解后的代码,会发现一个隐藏在多个嵌套 label 中的 switch 语句。这是一个状态机的实现,用来在 wheresWaldo() 方法中控制不同的挂起点。下面是整体的结构:

// 程序清单 1:生成的 switch 语句和 label
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
        case 0:
            // 省略代码
        case 1:
            // 省略代码
        case 2:
            // 省略代码
        case 3:
            // 省略代码
        case 4:
            // 省略代码
        case 5:
            // 省略代码
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // 结束 switch
        // 省略代码
    } // 结束 label 46
    // 省略代码
  } // 结束 label 47
  // 省略代码
} // 结束 label 48
// 省略代码

我们现在可以看到 continuation 中“label”字段的目的了。当完成“wheresWaldo”的不同阶段时,我们将会变更“label”中的值。嵌套的 label 代码块中包含了原始 Kotlin 代码里面挂起点之间的代码块。这个“label”值允许重新进入该代码,跳到最后挂起的地方(适当的 case 语句),以便于从 continuation 中抽取数据,然后跳转到正确的 label 代码块。

但是,如果所有的挂起点都没有真正挂起的话,整个代码块可以同步执行。在生成的代码中,我们经常会看到这样的片段:

// 程序清单 2:确定当前的协程是否应该挂起
if (var10000 == var11) {
  return var11;
}

从上面我们可以看到,“var11”被设置成了CONTINUATION_SUSPENDED的值,而“var10000”保存了对另一个挂起函数的调用的返回值。因此,当挂起发生时,代码将返回 (稍后会重新进入),如果没有发生挂起,则代码将通过切换到适当的 label 块继续执行函数的下一部分。

再次强调,请记住,生成的代码不能假设所有调用都将挂起,或者所有调用都将继续使用当前的协程。它必须能够处理任何可能的组合。

跟踪执行

当我们开始执行时,continuation 中“label”的值将会被置为零。如下是对应的 switch 语句分支:

// 程序清单 3:switch 的第一个分支
case 0:
  ResultKt.throwOnFailure($result);
  $continuation.L$0 = this;
  $continuation.L$1 = starterName;
  $continuation.label = 1;
  var10000 = this.fetchNewName(starterName, $continuation);
  if (var10000 == var11) {
     return var11;
  }
  break;

我们将实例和参数存储到了 continuation 对象中,然后将 continuation 对象传递给“fetchNewName”。如前文所述,编译器所生成的“fetchNewName”版本要么返回实际的结果,要么返回COROUTINE_SUSPENDED值。

如果协程挂起的话,那么我们会从函数中返回,并且当我们恢复的时候,会跳入“case 1”分支。如果我们继续使用当前的协程,那么会跳出 switch 到某一个 label 代码块,进入如下的代码:

// 程序清单 4:第二次调用“fetchNewName”
firstName = (String)var10000;
secondName = UtilsKt.addThreadId("Found " + firstName + " name");
boolean var13 = false;
System.out.println(secondName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.label = 2;
var10000 = this.fetchNewName(firstName, $continuation);
if (var10000 == var11) {
  return var11;
}

因为我们知道“var10000”包含了我们期望的返回值,所以我们可以将其转换成正确的类型并存储到局部变量“firstName”中。随后,生成的代码会使用“secondName”来存储线程 id 连接的结果,并且将其打印了出来。

我们更新了 continuation 中的字段,添加了服务器检索到的值。注意,“label”的值现在已经是 2 了。随后,我们第三次调用“fetchNewName”。

第三次调用“fetchNewName”——无挂起

我们需要再次基于“fetchNewName”返回的值做出选择,如果返回的值是COROUTINE_SUSPENDED的话,我们会从当前函数返回。当下一次调用时,我们要遵循 switch 的“case 2”分支。

如果我们继续使用当前协程的话,那么将会执行如下的代码块。我们可以看到它与上面的代码是相同的,只不过我们现在要有更多的数据存储到 continuation 中。

// 程序清单 4:第三次调用“fetchNewName”
secondName = (String)var10000;
thirdName = UtilsKt.addThreadId("Found " + secondName + " name");
boolean var14 = false;
System.out.println(thirdName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.L$3 = secondName;
$continuation.label = 3;
var10000 = this.fetchNewName(secondName, (Continuation)$continuation);
if (var10000 == var11) {
  return var11;
}

这种模式在后续的调用中会重复(假定始终没有返回COROUTINE_SUSPENDED),直到到达终点为止。

第三次调用“fetchNewName”——带有挂起

或者,如果协程已经被挂起的话,那么将会运行如下的代码块:

// 程序清单 5:switch 的第三个分支
case 2:
  firstName = (String)$continuation.L$2;
  starterName = (String)$continuation.L$1;
  this = (HttpWaldoFinder)$continuation.L$0;
  ResultKt.throwOnFailure($result);
  var10000 = $result;
  break label46;

我们将 continuation 中的值抽取到函数的局部变量中。随后使用一个 label 形式的 break 使执行跳转至前述的程序清单 4 中。所以最终,我们会在相同的地方结束。

总结执行过程

现在,我们可以重新看一下程序清单的代码结构,并在整体上描述一下每个区域中都发生了什么:

// 程序清单 6:深度解析生成的 switch 语句和 label
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
  label47: {
     label46: {
        Object $result = $continuation.result;
        var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch($continuation.label) {
       case 0:
            // 将 label 设置为 1,如果返回挂起的话,第一次调用“fetchNewName”
            // 否则,从 switch 中 break 退出
        case 1:
            // 从 continuation 中抽取参数
            // 从 switch 中 break 退出
        case 2:
            // 从 continuation 中抽取参数和第一个结果
            // 跳转到外边的“label46”
        case 3:
            // 从 continuation 中抽取参数,第一个和第二个结果
            // 跳转到外边的“label47”
        case 4:
            // 从 continuation 中抽取参数,第一个、第二个和第三个结果
            // 跳转到外边的“label48”
        case 5:
            // 从 continuation 中抽取参数,第一个、第二个、第三个和第四个结果
            // 返回最后的结果
        default:
           throw new IllegalStateException(
                "call to 'resume' before 'invoke' with coroutine");
        } // 结束 switch
        // 存储参数和第一个结果到 continuation 中
        // 如果返回挂起的话,将 label 设置为 2 并对“fetchNewName”进行第二次调用
        // 否则的话继续进行
    } // 结束 label 46
        // 存储参数、第一个结果和第二个结果到 continuation 中
        // 如果返回挂起的话,将 label 设置为 3 并对“fetchNewName”进行第三次调用
        // 否则的话继续进行
  } // 结束 label 47
        //  存储参数、第一个结果、第二个结果和第三个结果到 continuation 中
        //  如果返回挂起的话,将 label 设置为 4 并对“fetchNewName”进行第四次调用
        // 否则的话继续进行
} // 结束 label 48
// 存储参数、第一个结果、第二个结果、第三个结果和第四个结果到 continuation 中
// 将 label 设置为 5 并对“fetchNewName”进行第五次调用
// 返回最终结果或 COROUTINE_SUSPENDED


结果

这个代码库理解起来并不简单。我们分析了字节码分解得到的 Java 代码,这些字节码是由 Kotlin 编译器中的代码生成器所产生的。这个代码生成器的输出在设计时考虑的是效率和最小化,而不是可理解性。

但是,我们可以得出一些有用的结论:

  1. 并没有什么魔法。当开发人员第一次开始学习协程时,很容易会认为有些特殊的“魔法”将所有的这些事情连接在了一起。我们可以看到,生成的代码只使用了一些过程式编程的基本构建块,比如条件语句和带有 label 的 break。
  2. 实现是基于 continuation 的。在最初的 KEEP 提议中,函数挂起和恢复的方法是将函数的状态缓存在一个对象中。因此,对于每个挂起函数,编译器将创建一个包含 N 个字段的 continuation 类型,其中 N 是参数的数量加上字段数再加上 3。最后的三个字段保存当前对象、最终结果和索引。
  3. 执行始终遵循一个标准的模式。如果从挂起中恢复,那么我们要使用 continuation 的“label”字段跳转到 switch 语句的适当分支。在这个分支中,我们从 continuation 对象检索到目前为止已经找到的数据,然后使用一个带有 label 的 break 跳转到没有发生挂起的代码,这些代码本来是要直接执行的。

作者简介

Garth Gilmour 是 Instil 的学习主管(Head of Learning)。早在 1999 年,他就放弃了全职的开发工作,开始讲授 C++ 给 C 程序员,然后讲授 Java 给 C++ 程序员,然后是讲授 C#给 Java 程序员,现在他什么都教,但更喜欢 Kotlin 相关的工作。如果计算学员数量的话,很久之前就超过 1000 人了。他是 20 多门课程的作者,经常在技术会议上发言,在国内和国际会议上演讲,共同组织了贝尔法斯特 BASH 的开发人员系列活动,最近成立了贝尔法斯特 Kotlin 用户组。不在白板之前的时候,他还担任近身格斗和举重的教练。

Eamonn Boyle 有超过 15 年的开发、架构和团队领导经验。在过去的 4 年里,他一直担任全职培训师和教练,为各种各样的客户撰写和讲授各种主题的课程。其中包括范式和技术,从核心语言技能、框架到工具和过程。他还在一系列会议、活动和技术聚会上发表演讲并举办 workshop,其中包括 Goto 和 KotlinConf。

原文链接:

A Bottom-Up View of Kotlin Coroutines

书籍推荐

作者:Eamonn Boyle
链接:https://www.infoq.cn/article/RuBcSfg9bFt4gwdToyd7

看完两件小事

如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:

  1. 关注我们的 GitHub 博客,让我们成为长期关系
  2. 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
  3. 关注公众号 「画漫画的程序员」,公众号后台回复「资源」 免费领取我精心整理的前端进阶资源教程

JS中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。欢迎热爱技术的你一起加入交流与学习,JS中文网的使命是帮助开发者用代码改变世界

本文著作权归作者所有,如若转载,请注明出处

转载请注明:文章转载自「 Js中文网 · 前端进阶资源教程 」https://www.javascriptc.com

标题:深入浅出 Kotlin 协程

链接:https://www.javascriptc.com/3755.html

« 轻松搭建基于 SpringBoot + Vue 的 Web 商城应用
我们分析了 GitHub 上 5.46 亿条日志,发现中国开源虽然贡献大但还有这些不足…»
Flutter 中文教程资源

相关推荐

QR code