此版本仍在开发中,尚未稳定。如需最新的稳定版本,请使用 Spring Framework 7.0.6spring-doc.cadn.net.cn

协程

Kotlin 协程 是可挂起计算的实例,允许以命令式方式编写非阻塞代码。在语言层面, 挂起函数为异步操作提供了抽象,而在库层面, kotlinx.coroutines 提供了如 launch 这样的函数和如 CoroutineScope 这样的类型。spring-doc.cadn.net.cn

Spring Framework 在以下作用域中提供了对协程的支持:spring-doc.cadn.net.cn

依赖项

kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖项在类路径中时,启用协程支持:spring-doc.cadn.net.cn

build.gradle.ktsspring-doc.cadn.net.cn

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

版本 1.4.0 及以上受支持。spring-doc.cadn.net.cn

响应式如何转换为协程?

对于返回值,从响应式编程到协程 API 的转换如下:spring-doc.cadn.net.cn

  • fun handler(): Mono<Void> 变为 suspend fun handler()spring-doc.cadn.net.cn

  • fun handler(): Mono<T> 变为 suspend fun handler(): Tsuspend fun handler(): T?,具体取决于 Mono 是否可以为空(具有更静态类型的优点)spring-doc.cadn.net.cn

  • fun handler(): Flux<T> 变为 fun handler(): Flow<T>spring-doc.cadn.net.cn

对于输入参数:spring-doc.cadn.net.cn

  • 如果不需要惰性,fun handler(mono: Mono<T>) 变为 fun handler(value: T),因为可以调用挂起函数来获取值参数。spring-doc.cadn.net.cn

  • 如果需要懒加载,fun handler(mono: Mono<T>) 变为 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)spring-doc.cadn.net.cn

Flow 在协程世界中等同于 Flux,适用于热流或冷流、有限流或无限流,主要有以下区别:spring-doc.cadn.net.cn

阅读此博客文章 使用 Spring、协程和 Kotlin Flow 实现响应式编程 以获取更多信息,包括如何使用协程同时运行代码。spring-doc.cadn.net.cn

控制器

这是一个协程 @RestController 的示例。spring-doc.cadn.net.cn

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

使用 @Controller 进行视图渲染也受支持。spring-doc.cadn.net.cn

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

这是一个通过 coRouter { } DSL 和相关处理程序定义的协程路由器示例。spring-doc.cadn.net.cn

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

事务

通过响应式事务管理的编程变体,Coroutines上的事务得到了支持。spring-doc.cadn.net.cn

对于挂起函数,提供了一个 TransactionalOperator.executeAndAwait 扩展。spring-doc.cadn.net.cn

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

	suspend fun initDatabase() = operator.executeAndAwait {
		insertPerson1()
		insertPerson2()
	}

	private suspend fun insertPerson1() {
		// INSERT SQL statement
	}

	private suspend fun insertPerson2() {
		// INSERT SQL statement
	}
}

对于 Kotlin Flow,提供了 Flow<T>.transactional 扩展。spring-doc.cadn.net.cn

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

	fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

	private fun findPeople(): Flow<Person> {
		// SELECT SQL statement
	}

	private suspend fun updatePerson(person: Person): Person {
		// UPDATE SQL statement
	}
}

上下文传播

Spring应用通过集成Micrometer来支持可观察性。Micrometer用于观测支持。 对于追踪支持,当前的观测信息会通过ThreadLocal传递给阻塞代码, 或是通过Reactor的Context传递给响应式管道。但同时,当前的观测信息也需要在挂起函数的执行上下文中可用。 若没有这一点,当前的"traceId"将不会自动添加到协程的日志语句前。spring-doc.cadn.net.cn

PropagationContextElement 运算符通常确保 Micrometer 上下文传播库 与 Kotlin 协程一起正常工作。spring-doc.cadn.net.cn

它需要 io.micrometer:context-propagation 依赖,可选地还包括 org.jetbrains.kotlinx:kotlinx-coroutines-reactor 依赖。通过 CoroutinesUtils#invokeSuspendingFunction 实现的自动上下文传播(Spring 使用来将协程适配到 Reactor 的 FluxMono)可以通过调用 Hooks.enableAutomaticContextPropagation() 来启用。spring-doc.cadn.net.cn

应用程序还可以显式使用 PropagationContextElement 来结合上下文传播机制增强 CoroutineContextspring-doc.cadn.net.cn

fun main() {
	runBlocking(Dispatchers.IO + PropagationContextElement()) {
		waitAndLog()
	}
}

suspend fun waitAndLog() {
	delay(10)
	logger.info("Suspending function with traceId")
}

在这里,假设Micrometer Tracing已被配置,生成的日志语句将显示当前的"traceId",从而为您的应用解锁更佳的可观察性。spring-doc.cadn.net.cn