1-Intro
微服务体系中 Application 和 Application 间会用 MQ, 例如 RocketMQ , Kafka, Pular 来 解耦 ;
应用体系中 比如 layer-based-style 风格中,同层之间 也会用 事件机制来进行 解耦 , 典型的技术方案比如:
Spring Application EventEventBus
2-Spring Event
QuickStart
class SpringEventListenerTest {
@Test
fun `test publish`() {
val ctx = AnnotationConfigApplicationContext(SpringEventListenerTestConfig::class.java)
val bean = ctx.getBean(SpringEventListenerTestPublisher::class.java)
bean.publishEvent("我是一个小消息")
}
}
@Configuration
open class SpringEventListenerTestConfig {
/**
* 开启这个配置会 实现 异步化消费消息, 是一个全局配置
*/
@Bean(name = ["applicationEventMulticaster"])
open fun simpleApplicationEventMulticaster(): ApplicationEventMulticaster {
val eventMulticaster =
SimpleApplicationEventMulticaster()
eventMulticaster.setTaskExecutor(SimpleAsyncTaskExecutor())
return eventMulticaster
}
}
data class CustomSpringEvent(
val message: String,
val source: SpringEventListenerTestPublisher
) : ApplicationEvent(source)
@Component
class SpringEventListenerTestPublisher(
private val applicationEventPublisher: ApplicationEventPublisher
) {
fun publishEvent(message: String) {
applicationEventPublisher.publishEvent(CustomSpringEvent(message, this))
logger.info("生产者发送了消息:{}", message)
}
companion object {
private val logger = LogManager.getLogger(SpringEventListenerTestPublisher::class.java)
}
}
@Component
class SpringEventListenerTestListener : ApplicationListener<CustomSpringEvent> {
override fun onApplicationEvent(event: CustomSpringEvent) {
logger.info("接收到了消息: ${event.message}")
}
companion object {
private val logger = LogManager.getLogger(SpringEventListenerTestPublisher::class.java)
}
}
@Component
class SpringEventListenerTestAnnoListener {
@EventListener
fun onApplicationEvent(event: CustomSpringEvent) {
logger.info("Annotation Event 接收到了消息: ${event.message}")
}
companion object {
private val logger = LogManager.getLogger(SpringEventListenerTestAnnoListener::class.java)
}
}输出结果:
2024-06-13 16:16:19.896 INFO
[TraceId: , SpanId: ] --- [Executor-1]
ringEventListenerTestPublisher : 接收到了消息: 我是一个小消息
2024-06-13 16:16:19.896 INFO
[TraceId: , SpanId: ] --- [Executor-2]
gEventListenerTestAnnoListener : Annotation Event 接收到了消息: 我是一个小消息
2024-06-13 16:16:19.901 INFO
[TraceId: , SpanId: ] --- [ main]
ringEventListenerTestPublisher : 生产者发送了消息:我是一个小消息
- 注意到 消费者使用的是 异步的线程池
- 使用
simpleApplicationEventMulticaster开启了全局异步化
Tips
如果配置了一个 applicationEventMulticaster Bean,那么所有通过 Spring 的事件机制发布的事件,包括 Spring 自身使用的内部事件(如上下文刷新、自定义应用事件等),默认都会使用您配置的这个 applicationEventMulticaster 和对应的线程池。这是因为 applicationEventMulticaster 是 Spring 的全局事件广播器。
Tips
一种简单的方案是在 listener 中显示使用异步调度方案
高阶用法
GenericSpringEvent: 支持泛型 事件, Spring 根据监听器声明的泛型类型匹配事件,以确保每个监听器只接收其关心的具体泛型类型的事件 ;- 支持条件
EL表达式, 例如:@EventListener(condition = "#event.success")这样的条件引擎
3-EventBus
官方文档 非常 不推荐现在的 eventBus 方案. 考虑到如下的因素, 也没有维护下去了感觉.
- 交叉引用难以追踪
- 内部使用了反射机制
- 缺乏对多个事件的等待机制
- 不支持
backpressure的精确控制 - 对线程的控制 支持很少
- 缺乏监控能力
- 异常处理不足,不支持 传播异常事件
- 对
RxJava协程等机制的支持不足 - 性能差,尤其在 安卓上
- 不支持泛型
- 设计的时候在
java8之前
quick Start
internal class GuavaEventBusTst {
@Test
fun `test publish`() {
val ctx = AnnotationConfigApplicationContext(GuavaEventBusTstCfg::class.java)
val bean = ctx.getBean(EventBus::class.java)
println(bean)
bean.post(GuavaEventBusTstEvent("1"))
Thread.sleep(10L) /*junit sleep 等待异步处理*/
}
}
data class GuavaEventBusTstEvent(
val messageId: String
)
@Component
class GuavaEventBusTstListener(private val asyncEventBus: EventBus) : InitializingBean {
@Subscribe
fun onEvent(event: GuavaEventBusTstEvent) {
logger.info("event:{}", event)
}
companion object {
private val logger = LogManager.getLogger(GuavaEventBusTstListener::class.java)
}
override fun afterPropertiesSet() {
asyncEventBus.register(this)
}
}
@Configuration
open class GuavaEventBusTstCfg {
@Bean
open fun asyncEventBus(): EventBus = AsyncEventBus(Executors.newFixedThreadPool(2))
}4-Kotlin routinex
协程的实现中的 Channel 天然就适合作为事件的发布管道.
- 使用
CorutineScope来管理协程的生命周期, 来Spring Bean启动后启动一个协程来消费事件 Channel: 使用协程的Channel来作为 事件的中间管道
import jakarta.annotation.PostConstruct
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import org.apache.logging.log4j.LogManager
import org.junit.jupiter.api.Test
import org.springframework.context.annotation.Configuration
import kotlinx.coroutines.*
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.stereotype.Component
internal class KRoutineTest {
@Test
fun `test publish`() = runBlocking {
val ctx = AnnotationConfigApplicationContext(KRoutineTestCfg::class.java)
val publisher = ctx.getBean(EventPublisher::class.java)
// val listener = ctx.getBean(KRoutineTestListener::class.java)
publisher.publish(KRoutineTestEvent("1"))
// 等待一下,确保异步处理完成
delay(10L)
}
}
@Configuration
open class KRoutineTestCfg {
@Bean
open fun eventChannel(): Channel<KRoutineTestEvent> = Channel(Channel.BUFFERED)
@Bean
open fun eventPublisher(channel: Channel<KRoutineTestEvent>): EventPublisher {
return EventPublisher(channel)
}
}
data class KRoutineTestEvent(
val msgId: String
)
class EventPublisher(private val channel: Channel<KRoutineTestEvent>) {
suspend fun publish(event: KRoutineTestEvent) {
channel.send(event)
}
}
@Component
open class KRoutineTestListener(
private val channel: Channel<KRoutineTestEvent>
) : CoroutineScope by CoroutineScope(Dispatchers.Default) {
@PostConstruct
fun init() {
launch {
try {
channel.consumeEach { event ->
onEvent(event)
}
} catch (e: CancellationException) {
logger.info("Coroutine cancelled")
} catch (t: Throwable) {
logger.error("Error in consuming channel", t)
}
}
}
private fun onEvent(event: KRoutineTestEvent) {
logger.info("event:{}", event)
}
companion object {
private val logger = LogManager.getLogger(KRoutineTestListener::class.java)
}
}