1. 概述

在之前的多篇教程中,我们提到过 Spring 的 BeanPostProcessor。本文将结合 Guava 的 EventBus,带你用一个真实场景深入理解它的威力。

BeanPostProcessor 是 Spring 提供的扩展机制,允许我们在 Bean 的生命周期中插入自定义逻辑,直接干预 Bean 的创建过程。

它不仅能“观察”,还能“动手”——修改 Bean 实例本身。本文将通过一个集成 Guava EventBus 的完整示例,展示如何利用 BeanPostProcessor 实现事件订阅的自动化注册与注销,避免手动调用 register()unregister() 的繁琐和潜在内存泄漏。

2. 环境准备

首先,我们需要引入必要的依赖。在 pom.xml 中添加 Spring Context、Spring Expression 和 Guava:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-expression</artifactId>
    <version>5.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

3. 目标与实现

我们的核心目标有两个:

使用 Guava EventBus 实现系统内各组件间的异步消息通信。
利用 BeanPostProcessor 在 Bean 创建时自动注册为 EventBus 的订阅者,在销毁时自动注销,彻底告别手动管理。

要实现上述目标,我们需要以下组件:

  • Guava EventBus 的封装类
  • 自定义注解 @Subscriber
  • 实现 DestructionAwareBeanPostProcessor 的处理器
  • 事件模型 StockTrade
  • 事件接收者 StockTradePublisher
  • 集成测试用例

3.1. EventBus 封装类

为了避免直接暴露 Guava 的 EventBus 并提供统一访问入口,我们创建一个 GlobalEventBus 作为静态工具类:

public final class GlobalEventBus {

    public static final String GLOBAL_EVENT_BUS_EXPRESSION
      = "T(com.baeldung.postprocessor.GlobalEventBus).getEventBus()";

    private static final String IDENTIFIER = "global-event-bus";
    private static final GlobalEventBus GLOBAL_EVENT_BUS = new GlobalEventBus();
    private final EventBus eventBus = new AsyncEventBus(IDENTIFIER, Executors.newCachedThreadPool());

    private GlobalEventBus() {}

    public static GlobalEventBus getInstance() {
        return GlobalEventBus.GLOBAL_EVENT_BUS;
    }

    public static EventBus getEventBus() {
        return GlobalEventBus.GLOBAL_EVENT_BUS.eventBus;
    }

    public static void subscribe(Object obj) {
        getEventBus().register(obj);
    }
    
    public static void unsubscribe(Object obj) {
        getEventBus().unregister(obj);
    }
    
    public static void post(Object event) {
        getEventBus().post(event);
    }
}

关键点说明:

  • AsyncEventBus 确保事件处理是异步的,避免阻塞发布者。
  • GLOBAL_EVENT_BUS_EXPRESSION 是一个 SpEL 表达式,用于在注解中引用这个全局的 EventBus 实例。
  • 提供了 subscribeunsubscribepost 等静态方法,简化调用。

3.2. 自定义注解 @Subscriber

我们定义一个标记注解,用于标识哪些 Bean 需要被自动注册到 EventBus。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface Subscriber {
    String value() default GlobalEventBus.GLOBAL_EVENT_BUS_EXPRESSION;
}
  • @Retention(RUNTIME):确保注解在运行时可通过反射获取。
  • @Target(ElementType.TYPE):只能标注在类上。
  • value():可选参数,通过 SpEL 表达式指定要注册到哪个 EventBus。默认指向全局的 GlobalEventBus

3.3. BeanPostProcessor 实现

这是整个方案的核心。我们实现 DestructionAwareBeanPostProcessor,它继承自 BeanPostProcessor,并增加了销毁前的回调。

public class GuavaEventBusBeanPostProcessor
  implements DestructionAwareBeanPostProcessor {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    SpelExpressionParser expressionParser = new SpelExpressionParser();

    @Override
    public void postProcessBeforeDestruction(Object bean, String beanName)
      throws BeansException {
        this.process(bean, EventBus::unregister, "destruction");
    }

    @Override
    public boolean requiresDestruction(Object bean) {
        return true;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName)
      throws BeansException {
        this.process(bean, EventBus::register, "initialization");
        return bean;
    }

    private void process(Object bean, BiConsumer<EventBus, Object> consumer, String action) {
        Object proxy = this.getTargetObject(bean);
        Subscriber annotation = AnnotationUtils.getAnnotation(proxy.getClass(), Subscriber.class);
        if (annotation == null)
            return;
        this.logger.info("{}: processing bean of type {} during {}",
          this.getClass().getSimpleName(), proxy.getClass().getName(), action);
        String annotationValue = annotation.value();
        try {
            Expression expression = this.expressionParser.parseExpression(annotationValue);
            Object value = expression.getValue();
            if (!(value instanceof EventBus)) {
                this.logger.error(
                  "{}: expression {} did not evaluate to an instance of EventBus for bean of type {}",
                  this.getClass().getSimpleName(), annotationValue, proxy.getClass().getSimpleName());
                return;
            }
            EventBus eventBus = (EventBus)value;
            consumer.accept(eventBus, proxy);
        } catch (ExpressionException ex) {
            this.logger.error("{}: unable to parse/evaluate expression {} for bean of type {}",
              this.getClass().getSimpleName(), annotationValue, proxy.getClass().getName());
        }
    }

    private Object getTargetObject(Object proxy) throws BeansException {
        if (AopUtils.isJdkDynamicProxy(proxy)) {
            try {
                return ((Advised)proxy).getTargetSource().getTarget();
            } catch (Exception e) {
                throw new FatalBeanException("Error getting target of JDK proxy", e);
            }
        }
        return proxy;
    }
}

工作流程解析:

  1. **postProcessAfterInitialization**:在 Bean 初始化完成后调用。此时 Bean 已完全构建,我们调用 process 方法,传入 EventBus::register 作为 BiConsumer,尝试注册该 Bean。
  2. **postProcessBeforeDestruction**:在 Bean 销毁前调用。调用 process 方法,传入 EventBus::unregister,进行注销。
  3. process 方法
    • 通过 getTargetObject 获取原始 Bean(处理 AOP 代理的情况)。
    • 检查 Bean 的类上是否有 @Subscriber 注解。
    • 如果有,解析注解中的 SpEL 表达式(value())。
    • 将表达式求值,得到 EventBus 实例。
    • 使用传入的 BiConsumerregisterunregister)对 Bean 和 EventBus 执行操作。
  4. getTargetObject 方法:处理 Spring AOP 生成的 JDK 动态代理,获取被代理的真实目标对象,确保事件订阅的是业务逻辑类而非代理类。

⚠️ 踩坑提示requiresDestruction 必须返回 true,否则 postProcessBeforeDestruction 不会触发。Spring 会根据此方法判断该 BeanPostProcessor 是否需要参与销毁流程。

3.4. StockTrade 模型

定义一个简单的股票交易事件模型:

public class StockTrade {

    private String symbol;
    private int quantity;
    private double price;
    private Date tradeDate;
    
    // constructor, getters, setters (省略)
}

3.5. 事件接收者 StockTradePublisher

这个类将被 @Subscriber 标注,成为 EventBus 的订阅者。

@FunctionalInterface
public interface StockTradeListener {
    void stockTradePublished(StockTrade trade);
}
@Subscriber
public class StockTradePublisher {

    Set<StockTradeListener> stockTradeListeners = new HashSet<>();

    public void addStockTradeListener(StockTradeListener listener) {
        synchronized (this.stockTradeListeners) {
            this.stockTradeListeners.add(listener);
        }
    }

    public void removeStockTradeListener(StockTradeListener listener) {
        synchronized (this.stockTradeListeners) {
            this.stockTradeListeners.remove(listener);
        }
    }

    @Subscribe
    @AllowConcurrentEvents
    void handleNewStockTradeEvent(StockTrade trade) {
        // publish to DB, send to PubNub, ...
        Set<StockTradeListener> listeners;
        synchronized (this.stockTradeListeners) {
            listeners = new HashSet<>(this.stockTradeListeners);
        }
        listeners.forEach(li -> li.stockTradePublished(trade));
    }
}

关键点说明:

  • @Subscriber:让 BeanPostProcessor 在初始化时自动将其注册到 EventBus
  • @Subscribe:Guava 的注解,标记 handleNewStockTradeEvent 方法为事件处理方法。方法参数类型 StockTrade 决定了它接收的事件类型。
  • @AllowConcurrentEvents:允许该方法被多个线程并发执行,提高处理吞吐量。
  • 方法内部将事件转发给所有注册的 StockTradeListener

3.6. 集成测试

编写测试用例验证整个流程:

@Configuration
public class PostProcessorConfiguration {

    @Bean
    public GlobalEventBus eventBus() {
        return GlobalEventBus.getInstance();
    }

    @Bean
    public GuavaEventBusBeanPostProcessor eventBusBeanPostProcessor() {
        return new GuavaEventBusBeanPostProcessor();
    }

    @Bean
    public StockTradePublisher stockTradePublisher() {
        return new StockTradePublisher();
    }
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = PostProcessorConfiguration.class)
public class StockTradeIntegrationTest {

    @Autowired
    StockTradePublisher stockTradePublisher;

    @Test
    public void givenValidConfig_whenTradePublished_thenTradeReceived() {
        Date tradeDate = new Date();
        StockTrade stockTrade = new StockTrade("AMZN", 100, 2483.52d, tradeDate);
        AtomicBoolean assertionsPassed = new AtomicBoolean(false);
        StockTradeListener listener = trade -> assertionsPassed
          .set(this.verifyExact(stockTrade, trade));
        this.stockTradePublisher.addStockTradeListener(listener);
        try {
            GlobalEventBus.post(stockTrade);
            await().atMost(Duration.ofSeconds(2L))
              .untilAsserted(() -> assertThat(assertionsPassed.get()).isTrue());
        } finally {
            this.stockTradePublisher.removeStockTradeListener(listener);
        }
    }

    boolean verifyExact(StockTrade stockTrade, StockTrade trade) {
        return Objects.equals(stockTrade.getSymbol(), trade.getSymbol())
          && Objects.equals(stockTrade.getTradeDate(), trade.getTradeDate())
          && stockTrade.getQuantity() == trade.getQuantity()
          && stockTrade.getPrice() == trade.getPrice();
    }
}

测试逻辑:

  1. 启动 Spring 上下文,GuavaEventBusBeanPostProcessor 会自动将 stockTradePublisher Bean 注册到 EventBus
  2. 创建一个 StockTrade 事件并发布到 GlobalEventBus
  3. StockTradePublisherhandleNewStockTradeEvent 方法被触发,将事件转发给监听器。
  4. 断言监听器接收到的事件与发布的事件完全一致。
  5. 使用 await().atMost() 处理异步调用的等待,避免测试过早结束。

4. 总结

通过这个实战示例,我们可以看到 BeanPostProcessor 的强大之处:

  • 自动化:将繁琐的 register/unregister 操作完全自动化,开发者只需添加 @Subscriber 注解即可。
  • 解耦:事件发布者无需知道具体的订阅者,订阅者也无需关心自己是如何被注册的。
  • 安全:利用 DestructionAwareBeanPostProcessor 确保 Bean 销毁时必然注销,防止内存泄漏。

BeanPostProcessor 是 Spring 扩展机制中非常核心且灵活的工具,合理使用可以极大地提升代码的简洁性和可维护性。这个集成 Guava EventBus 的例子,就是一个简单粗暴又实用的典型应用场景。


原始标题:Spring BeanPostProcessor