协程
Kotlin 协程 是可挂起计算的实例,允许以命令式方式编写非阻塞代码。在语言层面,
挂起函数为异步操作提供了抽象,而在库层面,
kotlinx.coroutines 提供了如
launch
这样的函数和如 CoroutineScope 这样的类型。
Spring Framework 在以下作用域中提供了对协程的支持:
-
在Spring MVC和WebFlux注解中支持挂起函数
@Controller -
WebFlux.fn 路由 { } DSL
-
WebFlux
CoWebFilter -
挂起函数和 RSocket
@MessageMapping注解方法中的Flow支持 -
用于
RSocketRequester的扩展 -
Spring AOP
依赖项
当 kotlinx-coroutines-core 和 kotlinx-coroutines-reactor 依赖项在类路径中时,启用协程支持:
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
版本 1.4.0 及以上受支持。
响应式如何转换为协程?
对于返回值,从响应式编程到协程 API 的转换如下:
-
fun handler(): Mono<Void>变为suspend fun handler() -
fun handler(): Mono<T>变为suspend fun handler(): T或suspend fun handler(): T?,具体取决于Mono是否可以为空(具有更静态类型的优点) -
fun handler(): Flux<T>变为fun handler(): Flow<T>
对于输入参数:
-
如果不需要惰性,
fun handler(mono: Mono<T>)变为fun handler(value: T),因为可以调用挂起函数来获取值参数。 -
如果需要懒加载,
fun handler(mono: Mono<T>)变为fun handler(supplier: suspend () → T)或fun handler(supplier: suspend () → T?)
Flow 在协程世界中等同于 Flux,适用于热流或冷流、有限流或无限流,主要有以下区别:
-
Flow是基于推送的,而Flux是推送-拉取混合模式 -
背压是通过挂起函数实现的
-
Flow只有一个 挂起的collect方法,而操作符作为 扩展 实现 -
操作符的实现非常简单,这要归功于协程
-
扩展可以用来向
Flow添加自定义运算符 -
集合操作是挂起函数
-
map运算符 支持异步操作(不需要flatMap),因为它接受一个挂起函数参数
阅读此博客文章 使用 Spring、协程和 Kotlin Flow 实现响应式编程 以获取更多信息,包括如何使用协程同时运行代码。
控制器
这是一个协程 @RestController 的示例。
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
使用 @Controller 进行视图渲染也受支持。
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
这是一个通过 coRouter { } DSL 和相关处理程序定义的协程路由器示例。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
事务
通过响应式事务管理的编程变体,Coroutines上的事务得到了支持。
对于挂起函数,提供了一个 TransactionalOperator.executeAndAwait 扩展。
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
对于 Kotlin Flow,提供了 Flow<T>.transactional 扩展。
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}
上下文传播
Spring应用通过集成Micrometer来支持可观察性。Micrometer用于观测支持。
对于追踪支持,当前的观测信息会通过ThreadLocal传递给阻塞代码,
或是通过Reactor的Context传递给响应式管道。但同时,当前的观测信息也需要在挂起函数的执行上下文中可用。
若没有这一点,当前的"traceId"将不会自动添加到协程的日志语句前。
PropagationContextElement 运算符通常确保
Micrometer 上下文传播库 与 Kotlin 协程一起正常工作。
它需要 io.micrometer:context-propagation 依赖,可选地还包括 org.jetbrains.kotlinx:kotlinx-coroutines-reactor 依赖。通过 CoroutinesUtils#invokeSuspendingFunction 实现的自动上下文传播(Spring 使用来将协程适配到 Reactor 的 Flux 或 Mono)可以通过调用 Hooks.enableAutomaticContextPropagation() 来启用。
应用程序还可以显式使用 PropagationContextElement 来结合上下文传播机制增强 CoroutineContext:
fun main() {
runBlocking(Dispatchers.IO + PropagationContextElement()) {
waitAndLog()
}
}
suspend fun waitAndLog() {
delay(10)
logger.info("Suspending function with traceId")
}
在这里,假设Micrometer Tracing已被配置,生成的日志语句将显示当前的"traceId",从而为您的应用解锁更佳的可观察性。