Encapsulating EventBus in SpringBoot

Foreword

EventBus is Guava’s event processing mechanism and an implementation of the observer pattern (production/consumption model).

The observer mode is widely used in our daily development. For example, in the order system, changes in order status or logistics information will send APP push messages, text messages, notifications to sellers, buyers, etc. to users; in the approval system, the process of approval orders The transfer will notify the user who initiated the approval, the leader of the approval, etc.

The Observer mode is also supported by the JDK. Observer already exists in version 1.0. However, with the rapid upgrade of Java version, its usage has not changed. Many libraries provide simpler implementations, such as Guava EventBus, RxJava, EventBus, etc.

1. Why use the Observer mode and the advantages of EventBus?

EventBus Advantages

  • Programming is simpler and more convenient than Observer
  • Synchronous, asynchronous operations and exception handling can be implemented through custom parameters
  • Single process use, no network impact

Disadvantages

  • Can only be used by a single process
  • Abnormal restart or exit of the project does not guarantee message persistence.

If you need distributed use, you still need to use MQ

2. Steps to use EventBus

Import the corresponding dependencies

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>19.0</version>
</dependency>

Write thread pool configuration

@Data
@ConfigurationProperties(prefix = "eventbus.pool")
public class EventBusExecutorProperites {<!-- -->
    /**
     *Number of core threads
     */
    private int corePoolSize = 5;
    /**
     *Maximum number of threads
     */
    private int maxPoolSize = 20;
    /**
     * Idle thread destruction time
     */
    private int keepAliveSeconds = 60;

    /**
     * Task queue capacity
     */
    private int queueCapacity = 1000;

    /**
     * Whether to allow core threads to exit idle
     */
    private boolean allowCoreThreadTimeOut = false;

    /**
     * Thread prefix
     */
    private String threadNamePrefix = "event-bus-pool-";
}

The method of event subscriber in EventBus must only accept one parameter and declare the basic parameter class

@Data
public abstract class BaseCallableBean {<!-- -->
    private String oper;
    private String opeTime;
}

Declare the subscriber base class

public abstract class BaseCallable<T extends BaseCallableBean> {<!-- -->
    public abstract void call(T t);
}

The event listener class is defined and the parameter class is defined

Note: Use @Subscribe annotation

@Data
@ToString
@AllArgsConstructor
public class TestBean extends BaseCallableBean{<!-- -->
    private String id;
    private String name;
}
@Slf4j
@Component
public class TestBaseCallable extends BaseCallable<TestBean> {<!-- -->
    @Override
    @Subscribe
    public void call(TestBean testBean) {<!-- -->
        log.info("hello");
        log.info(testBean.toString());
    }
}

Configure EventBus

@Slf4j
@Configuration
@EnableConfigurationProperties(EventBusExecutorProperites.class)
public class EventBusConfiguration implements ApplicationListener<ContextRefreshedEvent> {<!-- -->

    @Autowired
    private EventBusExecutorProperites executorProperites;

    @Bean("eventBusExecutor")
    @Primary
    public Executor getThreadPoolTaskExecutor() {<!-- -->
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // core thread
        executor.setCorePoolSize(executorProperites.getCorePoolSize());
        //Maximum number of threads
        executor.setMaxPoolSize(executorProperites.getMaxPoolSize());
        //Task queue capacity
        executor.setQueueCapacity(executorProperites.getQueueCapacity());
        // Thread prefix
        executor.setThreadNamePrefix(executorProperites.getThreadNamePrefix());
        // Whether to allow core threads to exit idle
        executor.setAllowCoreThreadTimeOut(executorProperites.isAllowCoreThreadTimeOut());
        //Idle thread destruction time
        executor.setKeepAliveSeconds(executorProperites.getKeepAliveSeconds());
        // Task decorator - enhancer, similar to aop
        executor.setTaskDecorator(new MDCTaskDecorator());
        // Do not discard the task and let the calling thread process the task
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // If true, thread pool shutdown false is shutdownNow
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    static class MDCTaskDecorator implements TaskDecorator {<!-- -->
        @Override
        public Runnable decorate(Runnable runnable) {<!-- -->
            // Get a copy of the current thread's MDC context data and store it in mdcContext.
            Map<String, String> contextMap = MDC.getCopyOfContextMap();
            return () -> {<!-- -->
                if (contextMap != null) {<!-- -->
                    //Reset the data in mdcContext back to the current thread's MDC to restore or pass context data to other threads or tasks.
                    MDC.setContextMap(contextMap);
                }
                runnable.run();
            };
        }
    }

    @Bean("eventBus")
    public EventBus eventBus() {<!-- -->
        return new AsyncEventBus(getThreadPoolTaskExecutor());
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {<!-- -->
        registerSubscribes(event.getApplicationContext());
    }

    private void registerSubscribes(ApplicationContext context) {<!-- -->
        if (null == context) {<!-- -->
            return;
        }
        // Get the bean of type BaseCallable according to the context
        String[] beanDefinitionNames = context.getBeanNamesForType(BaseCallable.class, false, true);
        // Get the eventbus bean based on the context for registration
        EventBus eventBus = context.getBean(EventBus.class);
        for (String beanDefinitionName : beanDefinitionNames) {<!-- -->
            Object bean = context.getBean(beanDefinitionName);
            try {<!-- -->
                Class<?> aClass = bean.getClass();
                Method call = aClass.getMethod("call", BaseCallableBean.class);

                if (null == call) {<!-- -->
                    continue;
                }
                log.info("EventBus method-Subscribe register bean[{}]", beanDefinitionName);
            } catch (NoSuchMethodException e) {<!-- -->
                log.info("EventBus method-Subscribe register bean Exception : {}", e.getMessage());
                e.printStackTrace();
            }
            eventBus.register(bean);
        }
    }
}

test

@RestController
@SpringBootApplication
public class DemoApplication {<!-- -->
    @Autowired
    private EventBus eventBus;

    public static void main(String[] args) {<!-- -->
        SpringApplication.run(DemoApplication.class, args);
    }
    
    @GetMapping("testEventBus")
    public String testEventBus(){<!-- -->
        TestBean testBean = new TestBean("1","hello");
        eventBus.post(testBean);
        return "ok";
    }
}

Test Results