1. 概述
在之前的多篇教程中,我们提到过 Spring 的 BeanPostProcessor
。本文将结合 Guava 的 EventBus
,带你用一个真实场景深入理解它的威力。
BeanPostProcessor
是 Spring 提供的扩展机制,允许我们在 Bean 的生命周期中插入自定义逻辑,直接干预 Bean 的创建过程。
它不仅能“观察”,还能“动手”——修改 Bean 实例本身。本文将通过一个集成 Guava EventBus 的完整示例,展示如何利用 BeanPostProcessor
实现事件订阅的自动化注册与注销,避免手动调用 register()
和 unregister()
的繁琐和潜在内存泄漏。
2. 环境准备
首先,我们需要引入必要的依赖。在 pom.xml
中添加 Spring Context、Spring Expression 和 Guava:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
</dependency>
3. 目标与实现
我们的核心目标有两个:
✅ 使用 Guava EventBus 实现系统内各组件间的异步消息通信。
✅ 利用 BeanPostProcessor
在 Bean 创建时自动注册为 EventBus 的订阅者,在销毁时自动注销,彻底告别手动管理。
要实现上述目标,我们需要以下组件:
- Guava
EventBus
的封装类 - 自定义注解
@Subscriber
- 实现
DestructionAwareBeanPostProcessor
的处理器 - 事件模型
StockTrade
- 事件接收者
StockTradePublisher
- 集成测试用例
3.1. EventBus 封装类
为了避免直接暴露 Guava 的 EventBus
并提供统一访问入口,我们创建一个 GlobalEventBus
作为静态工具类:
public final class GlobalEventBus {
public static final String GLOBAL_EVENT_BUS_EXPRESSION
= "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()";
private static final String IDENTIFIER = "global-event-bus";
private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());
private GlobalEventBus() {}
public static GlobalEventBus getInstance() {
return GlobalEventBus.GLOBAL_EVENT_BUS;
}
public static EventBus getEventBus() {
return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
}
public static void subscribe(Object obj) {
getEventBus().register(obj);
}
public static void unsubscribe(Object obj) {
getEventBus().unregister(obj);
}
public static void post(Object event) {
getEventBus().post(event);
}
}
关键点说明:
AsyncEventBus
确保事件处理是异步的,避免阻塞发布者。GLOBAL_EVENT_BUS_EXPRESSION
是一个 SpEL 表达式,用于在注解中引用这个全局的 EventBus 实例。- 提供了
subscribe
、unsubscribe
和post
等静态方法,简化调用。
3.2. 自定义注解 @Subscriber
我们定义一个标记注解,用于标识哪些 Bean 需要被自动注册到 EventBus。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {
String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}
@Retention(RUNTIME)
:确保注解在运行时可通过反射获取。@Target(ElementType.TYPE)
:只能标注在类上。value()
:可选参数,通过 SpEL 表达式指定要注册到哪个 EventBus。默认指向全局的GlobalEventBus
。
3.3. BeanPostProcessor 实现
这是整个方案的核心。我们实现 DestructionAwareBeanPostProcessor
,它继承自 BeanPostProcessor
,并增加了销毁前的回调。
public class GuavaEventBusBeanPostProcessor
implements DestructionAwareBeanPostProcessor {
Logger logger = LoggerFactory.getLogger(this.getClass());
SpelExpressionParser expressionParser = new SpelExpressionParser();
@Override
public void postProcessBeforeDestruction(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::unregister, "destruction");
}
@Override
public boolean requiresDestruction(Object bean) {
return true;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
this.process(bean, EventBus::register, "initialization");
return bean;
}
private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
Object proxy = this.getTargetObject(bean);
Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
if (annotation == null)
return;
this.logger.info("{}: processing bean of type {} during {}",
this.getClass().getSimpleName(), proxy.getClass().getName(), action);
String annotationValue = annotation.value();
try {
Expression expression = this.expressionParser.parseExpression(annotationValue);
Object value = expression.getValue();
if (!(value instanceof EventBus)) {
this.logger.error(
"{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
return;
}
EventBus eventBus = (EventBus)value;
consumer.accept(eventBus, proxy);
} catch (ExpressionException ex) {
this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
}
}
private Object getTargetObject(Object proxy) throws BeansException {
if (AopUtils.isJdkDynamicProxy(proxy)) {
try {
return ((Advised)proxy).getTargetSource().getTarget();
} catch (Exception e) {
throw new FatalBeanException("Error getting target of JDK proxy", e);
}
}
return proxy;
}
}
工作流程解析:
- **
postProcessAfterInitialization
**:在 Bean 初始化完成后调用。此时 Bean 已完全构建,我们调用process
方法,传入EventBus::register
作为BiConsumer
,尝试注册该 Bean。 - **
postProcessBeforeDestruction
**:在 Bean 销毁前调用。调用process
方法,传入EventBus::unregister
,进行注销。 process
方法:- 通过
getTargetObject
获取原始 Bean(处理 AOP 代理的情况)。 - 检查 Bean 的类上是否有
@Subscriber
注解。 - 如果有,解析注解中的 SpEL 表达式(
value()
)。 - 将表达式求值,得到
EventBus
实例。 - 使用传入的
BiConsumer
(register
或unregister
)对 Bean 和EventBus
执行操作。
- 通过
-
getTargetObject
方法:处理 Spring AOP 生成的 JDK 动态代理,获取被代理的真实目标对象,确保事件订阅的是业务逻辑类而非代理类。
⚠️ 踩坑提示:requiresDestruction
必须返回 true
,否则 postProcessBeforeDestruction
不会触发。Spring 会根据此方法判断该 BeanPostProcessor 是否需要参与销毁流程。
3.4. StockTrade 模型
定义一个简单的股票交易事件模型:
public class StockTrade {
private String symbol;
private int quantity;
private double price;
private Date tradeDate;
// constructor, getters, setters (省略)
}
3.5. 事件接收者 StockTradePublisher
这个类将被 @Subscriber
标注,成为 EventBus 的订阅者。
@FunctionalInterface
public interface StockTradeListener {
void stockTradePublished(StockTrade trade);
}
@Subscriber
public class StockTradePublisher {
Set<StockTradeListener> stockTradeListeners = new HashSet<>();
public void addStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.add(listener);
}
}
public void removeStockTradeListener(StockTradeListener listener) {
synchronized (this.stockTradeListeners) {
this.stockTradeListeners.remove(listener);
}
}
@Subscribe
@AllowConcurrentEvents
void handleNewStockTradeEvent(StockTrade trade) {
// publish to DB, send to PubNub, ...
Set<StockTradeListener> listeners;
synchronized (this.stockTradeListeners) {
listeners = new HashSet<>(this.stockTradeListeners);
}
listeners.forEach(li -> li.stockTradePublished(trade));
}
}
关键点说明:
-
@Subscriber
:让BeanPostProcessor
在初始化时自动将其注册到EventBus
。 -
@Subscribe
:Guava 的注解,标记handleNewStockTradeEvent
方法为事件处理方法。方法参数类型StockTrade
决定了它接收的事件类型。 -
@AllowConcurrentEvents
:允许该方法被多个线程并发执行,提高处理吞吐量。 - 方法内部将事件转发给所有注册的
StockTradeListener
。
3.6. 集成测试
编写测试用例验证整个流程:
@Configuration
public class PostProcessorConfiguration {
@Bean
public GlobalEventBus eventBus() {
return GlobalEventBus.getInstance();
}
@Bean
public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
return new GuavaEventBusBeanPostProcessor();
}
@Bean
public StockTradePublisher stockTradePublisher() {
return new StockTradePublisher();
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PostProcessorConfiguration.class)
public class StockTradeIntegrationTest {
@Autowired
StockTradePublisher stockTradePublisher;
@Test
public void givenValidConfig_whenTradePublished_thenTradeReceived() {
Date tradeDate = new Date();
StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
AtomicBoolean assertionsPassed = new AtomicBoolean(false);
StockTradeListener listener = trade -> assertionsPassed
.set(this.verifyExact(stockTrade, trade));
this.stockTradePublisher.addStockTradeListener(listener);
try {
GlobalEventBus.post(stockTrade);
await().atMost(Duration.ofSeconds(2L))
.untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
} finally {
this.stockTradePublisher.removeStockTradeListener(listener);
}
}
boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
&& Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
&& stockTrade.getQuantity() == trade.getQuantity()
&& stockTrade.getPrice() == trade.getPrice();
}
}
测试逻辑:
- 启动 Spring 上下文,
GuavaEventBusBeanPostProcessor
会自动将stockTradePublisher
Bean 注册到EventBus
。 - 创建一个
StockTrade
事件并发布到GlobalEventBus
。 -
StockTradePublisher
的handleNewStockTradeEvent
方法被触发,将事件转发给监听器。 - 断言监听器接收到的事件与发布的事件完全一致。
- 使用
await().atMost()
处理异步调用的等待,避免测试过早结束。
4. 总结
通过这个实战示例,我们可以看到 BeanPostProcessor
的强大之处:
- 自动化:将繁琐的
register/unregister
操作完全自动化,开发者只需添加@Subscriber
注解即可。 - 解耦:事件发布者无需知道具体的订阅者,订阅者也无需关心自己是如何被注册的。
- 安全:利用
DestructionAwareBeanPostProcessor
确保 Bean 销毁时必然注销,防止内存泄漏。
BeanPostProcessor
是 Spring 扩展机制中非常核心且灵活的工具,合理使用可以极大地提升代码的简洁性和可维护性。这个集成 Guava EventBus 的例子,就是一个简单粗暴又实用的典型应用场景。