Programmer Zhuifeng 2023-05-20 22:00 Published in Hunan
Included in the collection #Spring Boot18
The blue “programmer chasing the wind” above, select “set as star”
Reply to “Information” to get organized interview information
original:
blog.csdn.net/qq_34886352/article/details/106494637
1. Function description
The enhanced tool for SpringBoot’s timing tasks realizes the dynamic management of SpringBoot’s native timing tasks, fully compatible with the native @Scheduled annotation, and does not need to modify the original timing tasks
Second, quick use
Specific functions have been encapsulated into SpringBoot-starter plug-and-play
<dependency> <groupId>com.github.guoyixing</groupId> <artifactId>spring-boot-starter-super-scheduled</artifactId> <version>0.3.1</version> </dependency>
3. Implementation principle
1. Realization of dynamic management
(1) Introduction to configuration management
@Component("superScheduledConfig") public class SuperScheduledConfig { /** * Thread pool for executing scheduled tasks */ private ThreadPoolTaskScheduler taskScheduler; /** * The container for the relationship between the name of the scheduled task and the callback hook of the scheduled task */ private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>(); /** * The relation container between the name of the scheduled task and the logic that needs to be executed by the scheduled task */ private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>(); /** * The container for the relationship between the name of the scheduled task and the source information of the scheduled task */ private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>(); /* Ordinary get/sets omitted */ }
(2) Use a post-processor to intercept SpringBoot’s original scheduled tasks
-
Implement the ApplicationContextAware interface to get the context of SpringBoot
-
Implement the BeanPostProcessor interface, mark this class as a post-processor, and the post-processor will be executed after each bean is instantiated
-
Use the @DependsOn annotation to force dependency on the SuperScheduledConfig class, and let SpringBoot instantiate the SuperScheduledConfig class before instantiating the SuperScheduledPostProcessor class
-
The main implementation logic is in the postProcessAfterInitialization() method
@DependsOn({"superScheduledConfig"}) @Component @Order public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware { protected final Log logger = LogFactory. getLog(getClass()); private ApplicationContext applicationContext; /** * Operations before instantiating the bean * @param bean bean instance * @param beanName The name of the bean */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } /** * Operation after instantiating the bean * @param bean bean instance * @param beanName The name of the bean */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //1. Get the configuration manager SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class); //2. Get all the methods of the currently instantiated bean Method[] methods = bean.getClass().getDeclaredMethods(); // Loop processing processes each method one by one if (methods. length > 0) { for (Method method : methods) { //3. Try to get the @Scheduled annotation on this method (SpringBoot's scheduled task annotation) Scheduled annotation = method. getAnnotation(Scheduled. class); //If the @Scheduled annotation cannot be obtained, skip this method if (annotation == null) { continue; } //4. Create the source attribute of the scheduled task //Create the source attribute of the scheduled task (used to record the configuration of the scheduled task, the original attribute on the annotation is recorded during initialization) ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean); //Detect the attributes obtained in the source attribute on the annotation if (!scheduledSource. check()) { throw new SuperScheduledException("In " + beanName + "Bean" + method.getName() + "method annotation parameter error"); } //Generate the name (id) of the scheduled task, using beanName + "." + method name String name = beanName + "." + method. getName(); //The source data will be stored in the configuration manager in the form of key-value, key: the name of the scheduled task value: source data superScheduledConfig.addScheduledSource(name, scheduledSource); try { //5. Cancel the original SpringBoot scheduled task clearOriginalScheduled(annotation); } catch (Exception e) { throw new SuperScheduledException("An error occurred while closing the original method " + beanName + method.getName() + ""); } } } //The last bean keeps the original return return bean; } /** * Modify the original attribute of the annotation * @param annotation annotation instance object * @throws Exception */ private void clearOriginalScheduled(Scheduled annotation) throws Exception { changeAnnotationValue(annotation, "cron", Scheduled. CRON_DISABLED); changeAnnotationValue(annotation, "fixedDelay", -1L); changeAnnotationValue(annotation, "fixedDelayString", ""); changeAnnotationValue(annotation, "fixedRate", -1L); changeAnnotationValue(annotation, "fixedRateString", ""); changeAnnotationValue(annotation, "initialDelay", -1L); changeAnnotationValue(annotation, "initialDelayString", ""); } /** * Get the context of SpringBoot * @param applicationContext SpringBoot context */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this. applicationContext = applicationContext; } }
(3) Use ApplicationRunner to initialize a custom scheduled task runner
-
Implement the ApplicationContextAware interface to get the context of SpringBoot
-
Use the @DependsOn annotation to enforce dependency on the threadPoolTaskScheduler class
-
Implement the ApplicationRunner interface and run custom logic after all beans are initialized
-
The main implementation logic is in the run() method
@DependsOn("threadPoolTaskScheduler") @Component public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware { protected final Log logger = LogFactory. getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private ApplicationContext applicationContext; /** * Timing task configuration manager */ @Autowired private SuperScheduledConfig superScheduledConfig; /** * Timing task execution thread */ @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; @Override public void run(ApplicationArguments args) { //1. The scheduled task execution thread is cached in the scheduled task configuration manager superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2. Get all scheduled task source data Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource(); //Process timing tasks one by one for (String name : nameToScheduledSource. keySet()) { //3. Get the scheduled task source data ScheduledSource scheduledSource = nameToScheduledSource. get(name); //4. Get all enhanced classes String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class); //5. Create an execution controller SuperScheduledRunnable runnable = new SuperScheduledRunnable(); //Configure the execution controller runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6. Process the enhanced classes one by one (the implementation principle of the enhancer will be analyzed later) List<Point> points = new ArrayList<>(baseStrengthenBeanNames. length); for (String baseStrengthenBeanName : baseStrengthenBeanNames) { //7. Proxy the enhancer into point Object baseStrengthenBean = applicationContext. getBean(baseStrengthenBeanName); //Create proxy Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy.setSuperScheduledName(name); //8. All the points are connected together points. add(proxy); } //Form the point into a call chain runnable. setChain(new Chain(points)); //Encapsulate and cache the execution logic into the scheduled task configuration manager superScheduledConfig.addRunnable(name, runnable::invoke); try { //8. Start the scheduled task ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler , scheduledSource, runnable::invoke); //Save the thread callback hook to the task configuration manager superScheduledConfig.addScheduledFuture(name, schedule); logger.info(df.format(LocalDateTime.now()) + "task" + name + "started..."); } catch (Exception e) { throw new SuperScheduledException("Task" + name + "failed to start, error message:" + e.getLocalizedMessage()); } } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this. applicationContext = applicationContext; } }
(4) Perform dynamic management
@Component public class SuperScheduledManager { protected final Log logger = LogFactory. getLog(getClass()); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private SuperScheduledConfig superScheduledConfig; /** * Modify the execution cycle of Scheduled * * @param name scheduled name * @param cron cron expression */ public void setScheduledCron(String name, String cron) { // Terminate the original task cancelScheduled(name); //Create a new task ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource. clear(); scheduledSource.setCron(cron); addScheduled(name, scheduledSource); } /** * Modify Scheduled's fixedDelay * * @param name scheduled name * @param fixedDelay How long after the last execution time point before executing */ public void setScheduledFixedDelay(String name, Long fixedDelay) { // Terminate the original task cancelScheduled(name); //Create a new task ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource. clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name, scheduledSource); } /** * Modify the fixedRate of Scheduled * * @param name scheduled name * @param fixedRate how long to execute after the last execution */ public void setScheduledFixedRate(String name, Long fixedRate) { // Terminate the original task cancelScheduled(name); //Create a new task ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource. clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name, scheduledSource); } /** * Query all started Scheduled */ public List<String> getRunScheduledName() { Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet(); return new ArrayList<>(names); } /** * Query all Scheduled */ public List<String> getAllSuperScheduledName() { Set<String> names = superScheduledConfig.getNameToRunnable().keySet(); return new ArrayList<>(names); } /** * Terminate Scheduled * * @param name scheduled name */ public void cancelScheduled(String name) { ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name); scheduledFuture. cancel(true); superScheduledConfig. removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now()) + "task" + name + "terminated..."); } /** * Start Scheduled * * @param name scheduled name * @param scheduledSource source information of scheduled task */ public void addScheduled(String name, ScheduledSource scheduledSource) { if (getRunScheduledName(). contains(name)) { throw new SuperScheduledException("Scheduled task" + name + "has already been started"); } if (!scheduledSource. check()) { throw new SuperScheduledException("Scheduled task" + name + "Source data content error"); } scheduledSource. refreshType(); Runnable runnable = superScheduledConfig. getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig. getTaskScheduler(); ScheduledFuture<?> schedule = ScheduledFutureFactory. create(taskScheduler, scheduledSource, runnable); logger.info(df.format(LocalDateTime.now()) + "task" + name + "started..."); superScheduledConfig.addScheduledSource(name, scheduledSource); superScheduledConfig.addScheduledFuture(name, schedule); } /** * Start Scheduled with cron type * * @param name scheduled name * @param cron cron expression */ public void addCronScheduled(String name, String cron) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setCron(cron); addScheduled(name, scheduledSource); } /** * Start Scheduled with fixedDelay type * * @param name scheduled name * @param fixedDelay How long after the last execution time point before executing * @param initialDelay The delay time of the first execution */ public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if (initialDelay != null & amp; & amp; initialDelay. length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null & amp; & amp; initialDelay. length > 1) { throw new SuperScheduledException("The delay time of the first execution can only pass in one parameter"); } addScheduled(name, scheduledSource); } /** * Start Scheduled with fixedRate type * * @param name scheduled name * @param fixedRate how long to execute after the last execution * @param initialDelay The delay time of the first execution */ public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedRate(fixedRate); if (initialDelay != null & amp; & amp; initialDelay. length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null & amp; & amp; initialDelay. length > 1) { throw new SuperScheduledException("The delay time of the first execution can only pass in one parameter"); } addScheduled(name, scheduledSource); } /** * Execute a task manually * * @param name scheduled name */ public void runScheduled(String name) { Runnable runnable = superScheduledConfig. getRunnable(name); runnable. run(); } }
2. Enhanced interface implementation
The overall idea of the enhancer implementation is consistent with that of SpringAop, and the implementation is not as complicated as Aop
(1) Enhanced interface
@Order(Ordered. HIGHEST_PRECEDENCE) public interface BaseStrengthen { /** * Pre-strengthening method * * @param bean bean instance (or proxied bean) * @param method The method object to execute * @param args method parameters */ void before(Object bean, Method method, Object[] args); /** * Post reinforcement method * An exception will not be executed * Executed after the afterFinally method if no exception occurs * * @param bean bean instance (or proxied bean) * @param method The method object to execute * @param args method parameters */ void after(Object bean, Method method, Object[] args); /** * Exception enhancement method * * @param bean bean instance (or proxied bean) * @param method The method object to execute * @param args method parameters */ void exception(Object bean, Method method, Object[] args); /** * Finally strengthens the method, and it will be executed if an exception occurs * * @param bean bean instance (or proxied bean) * @param method The method object to execute * @param args method parameters */ void afterFinally(Object bean, Method method, Object[] args); }
(2) Proxy abstract class
public abstract class Point { /** * Timing task name */ private String superScheduledName; /** * Abstract execution method, implemented using proxy * @param runnable timed task executor */ public abstract Object invoke(SuperScheduledRunnable runnable); /* Ordinary get/sets omitted */ }
(3) Call chain class
public class Chain { private List<Point> list; private int index = -1; /** * Index incremented by 1 */ public int incIndex() { return ++ index; } /** * Index restore */ public void resetIndex() { this.index = -1; } }
(4) cglib dynamic proxy implementation
Use the cglib proxy enhancer to proxy all the enhancers into call chain nodes Point
public class RunnableBaseInterceptor implements MethodInterceptor { /** * Timing task executor */ private SuperScheduledRunnable runnable; /** * Timed task enhancement class */ private BaseStrengthen strengthen; @Override public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object result; //If the invoke() method is executed if ("invoke".equals(method.getName())) { // pre-strengthening method strength.before(obj, method, args); try { //Call the invoke() method in the executor result = runnable. invoke(); } catch (Exception e) { // exception enhancement method strength. exception(obj, method, args); throw new SuperScheduledException(strengthen.getClass() + "An error occurred during strengthening execution", e); } finally { //Finally strengthens the method, and it will be executed if an exception occurs strength.afterFinally(obj, method, args); } // post reinforcement method strength after(obj, method, args); } else { // directly execute the method result = methodProxy.invokeSuper(obj, args); } return result; } public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) { this.runnable = runnable; if (BaseStrengthen. class. isAssignableFrom(object. getClass())) { this.strengthen = (BaseStrengthen) object; } else { throw new SuperScheduledException(object.getClass() + "object is not of type BaseStrengthen"); } } public RunnableBaseInterceptor() { } }
(5) Timing task executor implementation
public class SuperScheduledRunnable { /** * original method */ private Method method; /** * The bean where the method is located */ private Object bean; /** * The call chain of the enhancer */ private Chain chain; public Object invoke() { Object result; //Increment the index by 1 if (chain.incIndex() == chain.getList().size()) { //The enhanced methods in the call chain have all been executed try { // call chain index initialization chain. resetIndex(); //Enhancers are all executed, execute the original method result = method. invoke(bean); } catch (IllegalAccessException | InvocationTargetException e) { throw new SuperScheduledException(e. getLocalizedMessage()); } } else { // Get the method enhancer after being proxied Point point = chain.getList().get(chain.getIndex()); //Execute the enhancer proxy //In the enhancer proxy, the method executor will be called back to form a call chain, and the enhancers in the call chain will be run one by one result = point.invoke(this); } return result; } /* Ordinary get/sets omitted */ }
(6) Enhancer proxy logic
Code snippet in the com.gyx.superscheduled.core.SuperScheduledApplicationRunner
class
//Create execution controller SuperScheduledRunnable runnable = new SuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //The proxy object used to store the enhancer List<Point> points = new ArrayList<>(baseStrengthenBeanNames. length); //Loop through the beanName of all enhancers for (String baseStrengthenBeanName : baseStrengthenBeanNames) { // Get the bean object of the enhancer Object baseStrengthenBean = applicationContext. getBean(baseStrengthenBeanName); //Proxy the enhancer into a Point node Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy.setSuperScheduledName(name); //The proxy object of the enhancer is cached in the list points. add(proxy); } // Generate a call chain from the collection of enhancer proxy instances //Set the call chain in the execution controller runnable.setChain(new Chain(points));
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Java skill treeHomepageOverview 118337 people are studying systematically