Spring Boot timing task dynamic management general solution

Programmer Zhuifeng 2023-05-20 22:00 Published in Hunan

Included in the collection #Spring Boot18


The blue “programmer chasing the wind” above, select “set as star”

Reply to “Information” to get organized interview information

original:
blog.csdn.net/qq_34886352/article/details/106494637

1. Function description

The enhanced tool for SpringBoot’s timing tasks realizes the dynamic management of SpringBoot’s native timing tasks, fully compatible with the native @Scheduled annotation, and does not need to modify the original timing tasks

Second, quick use

Specific functions have been encapsulated into SpringBoot-starter plug-and-play

<dependency>
    <groupId>com.github.guoyixing</groupId>
    <artifactId>spring-boot-starter-super-scheduled</artifactId>
    <version>0.3.1</version>
</dependency>

3. Implementation principle

1. Realization of dynamic management

(1) Introduction to configuration management

@Component("superScheduledConfig")
public class SuperScheduledConfig {
    /**
     * Thread pool for executing scheduled tasks
     */
    private ThreadPoolTaskScheduler taskScheduler;

    /**
     * The container for the relationship between the name of the scheduled task and the callback hook of the scheduled task
     */
    private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();

    /**
     * The relation container between the name of the scheduled task and the logic that needs to be executed by the scheduled task
     */
    private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();

    /**
     * The container for the relationship between the name of the scheduled task and the source information of the scheduled task
     */
    private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();
 /* Ordinary get/sets omitted */
}

(2) Use a post-processor to intercept SpringBoot’s original scheduled tasks

  • Implement the ApplicationContextAware interface to get the context of SpringBoot

  • Implement the BeanPostProcessor interface, mark this class as a post-processor, and the post-processor will be executed after each bean is instantiated

  • Use the @DependsOn annotation to force dependency on the SuperScheduledConfig class, and let SpringBoot instantiate the SuperScheduledConfig class before instantiating the SuperScheduledPostProcessor class

  • The main implementation logic is in the postProcessAfterInitialization() method

@DependsOn({"superScheduledConfig"})
@Component
@Order
public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {
    protected final Log logger = LogFactory. getLog(getClass());

    private ApplicationContext applicationContext;

    /**
     * Operations before instantiating the bean
     * @param bean bean instance
     * @param beanName The name of the bean
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    /**
     * Operation after instantiating the bean
     * @param bean bean instance
     * @param beanName The name of the bean
     */
    @Override
    public Object postProcessAfterInitialization(Object bean,
                                                 String beanName) throws BeansException {
        //1. Get the configuration manager
        SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);

        //2. Get all the methods of the currently instantiated bean
        Method[] methods = bean.getClass().getDeclaredMethods();
        // Loop processing processes each method one by one
        if (methods. length > 0) {
            for (Method method : methods) {
             //3. Try to get the @Scheduled annotation on this method (SpringBoot's scheduled task annotation)
                Scheduled annotation = method. getAnnotation(Scheduled. class);
                //If the @Scheduled annotation cannot be obtained, skip this method
                if (annotation == null) {
                    continue;
                }
                //4. Create the source attribute of the scheduled task
                //Create the source attribute of the scheduled task (used to record the configuration of the scheduled task, the original attribute on the annotation is recorded during initialization)
                ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);
                //Detect the attributes obtained in the source attribute on the annotation
                if (!scheduledSource. check()) {
                    throw new SuperScheduledException("In " + beanName + "Bean" + method.getName() + "method annotation parameter error");
                }
                //Generate the name (id) of the scheduled task, using beanName + "." + method name
                String name = beanName + "." + method. getName();
                //The source data will be stored in the configuration manager in the form of key-value, key: the name of the scheduled task value: source data
                superScheduledConfig.addScheduledSource(name, scheduledSource);
                try {
                 //5. Cancel the original SpringBoot scheduled task
                    clearOriginalScheduled(annotation);
                } catch (Exception e) {
                    throw new SuperScheduledException("An error occurred while closing the original method " + beanName + method.getName() + "");
                }
            }
        }
        //The last bean keeps the original return
        return bean;
    }

    /**
     * Modify the original attribute of the annotation
     * @param annotation annotation instance object
     * @throws Exception
     */
    private void clearOriginalScheduled(Scheduled annotation) throws Exception {
        changeAnnotationValue(annotation, "cron", Scheduled. CRON_DISABLED);
        changeAnnotationValue(annotation, "fixedDelay", -1L);
        changeAnnotationValue(annotation, "fixedDelayString", "");
        changeAnnotationValue(annotation, "fixedRate", -1L);
        changeAnnotationValue(annotation, "fixedRateString", "");
        changeAnnotationValue(annotation, "initialDelay", -1L);
        changeAnnotationValue(annotation, "initialDelayString", "");
    }


    /**
     * Get the context of SpringBoot
     * @param applicationContext SpringBoot context
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this. applicationContext = applicationContext;
    }
}

(3) Use ApplicationRunner to initialize a custom scheduled task runner

  • Implement the ApplicationContextAware interface to get the context of SpringBoot

  • Use the @DependsOn annotation to enforce dependency on the threadPoolTaskScheduler class

  • Implement the ApplicationRunner interface and run custom logic after all beans are initialized

  • The main implementation logic is in the run() method

@DependsOn("threadPoolTaskScheduler")
@Component
public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {
    protected final Log logger = LogFactory. getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private ApplicationContext applicationContext;
 
 /**
     * Timing task configuration manager
     */
    @Autowired
    private SuperScheduledConfig superScheduledConfig;
    /**
     * Timing task execution thread
     */
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Override
    public void run(ApplicationArguments args) {
     //1. The scheduled task execution thread is cached in the scheduled task configuration manager
        superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);
        //2. Get all scheduled task source data
        Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();
        //Process timing tasks one by one
        for (String name : nameToScheduledSource. keySet()) {
            //3. Get the scheduled task source data
            ScheduledSource scheduledSource = nameToScheduledSource. get(name);
            //4. Get all enhanced classes
            String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);
            //5. Create an execution controller
            SuperScheduledRunnable runnable = new SuperScheduledRunnable();
            //Configure the execution controller
            runnable.setMethod(scheduledSource.getMethod());
            runnable.setBean(scheduledSource.getBean());
            //6. Process the enhanced classes one by one (the implementation principle of the enhancer will be analyzed later)
            List<Point> points = new ArrayList<>(baseStrengthenBeanNames. length);
            for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
             //7. Proxy the enhancer into point
                Object baseStrengthenBean = applicationContext. getBean(baseStrengthenBeanName);
                //Create proxy
                Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
                proxy.setSuperScheduledName(name);
                //8. All the points are connected together
                points. add(proxy);
            }
   //Form the point into a call chain
            runnable. setChain(new Chain(points));
            //Encapsulate and cache the execution logic into the scheduled task configuration manager
            superScheduledConfig.addRunnable(name, runnable::invoke);
            try {
             //8. Start the scheduled task
                ScheduledFuture<?> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler
                        , scheduledSource, runnable::invoke);
                //Save the thread callback hook to the task configuration manager
                superScheduledConfig.addScheduledFuture(name, schedule);
                logger.info(df.format(LocalDateTime.now()) + "task" + name + "started...");

            } catch (Exception e) {
                throw new SuperScheduledException("Task" + name + "failed to start, error message:" + e.getLocalizedMessage());
            }
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this. applicationContext = applicationContext;
    }
}

(4) Perform dynamic management

@Component
public class SuperScheduledManager {
    protected final Log logger = LogFactory. getLog(getClass());
    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private SuperScheduledConfig superScheduledConfig;

    /**
     * Modify the execution cycle of Scheduled
     *
     * @param name scheduled name
     * @param cron cron expression
     */
    public void setScheduledCron(String name, String cron) {
        // Terminate the original task
        cancelScheduled(name);
        //Create a new task
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource. clear();
        scheduledSource.setCron(cron);
        addScheduled(name, scheduledSource);
    }

    /**
     * Modify Scheduled's fixedDelay
     *
     * @param name scheduled name
     * @param fixedDelay How long after the last execution time point before executing
     */
    public void setScheduledFixedDelay(String name, Long fixedDelay) {
        // Terminate the original task
        cancelScheduled(name);
        //Create a new task
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource. clear();
        scheduledSource.setFixedDelay(fixedDelay);
        addScheduled(name, scheduledSource);
    }

    /**
     * Modify the fixedRate of Scheduled
     *
     * @param name scheduled name
     * @param fixedRate how long to execute after the last execution
     */
    public void setScheduledFixedRate(String name, Long fixedRate) {
        // Terminate the original task
        cancelScheduled(name);
        //Create a new task
        ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);
        scheduledSource. clear();
        scheduledSource.setFixedRate(fixedRate);
        addScheduled(name, scheduledSource);
    }

    /**
     * Query all started Scheduled
     */
    public List<String> getRunScheduledName() {
        Set<String> names = superScheduledConfig.getNameToScheduledFuture().keySet();
        return new ArrayList<>(names);
    }

    /**
     * Query all Scheduled
     */
    public List<String> getAllSuperScheduledName() {
        Set<String> names = superScheduledConfig.getNameToRunnable().keySet();
        return new ArrayList<>(names);
    }

    /**
     * Terminate Scheduled
     *
     * @param name scheduled name
     */
    public void cancelScheduled(String name) {
        ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);
        scheduledFuture. cancel(true);
        superScheduledConfig. removeScheduledFuture(name);
        logger.info(df.format(LocalDateTime.now()) + "task" + name + "terminated...");
    }

    /**
     * Start Scheduled
     *
     * @param name scheduled name
     * @param scheduledSource source information of scheduled task
     */
    public void addScheduled(String name, ScheduledSource scheduledSource) {
        if (getRunScheduledName(). contains(name)) {
            throw new SuperScheduledException("Scheduled task" + name + "has already been started");
        }
        if (!scheduledSource. check()) {
            throw new SuperScheduledException("Scheduled task" + name + "Source data content error");
        }

        scheduledSource. refreshType();

        Runnable runnable = superScheduledConfig. getRunnable(name);
        ThreadPoolTaskScheduler taskScheduler = superScheduledConfig. getTaskScheduler();


        ScheduledFuture<?> schedule = ScheduledFutureFactory. create(taskScheduler, scheduledSource, runnable);
        logger.info(df.format(LocalDateTime.now()) + "task" + name + "started...");

        superScheduledConfig.addScheduledSource(name, scheduledSource);
        superScheduledConfig.addScheduledFuture(name, schedule);
    }

    /**
     * Start Scheduled with cron type
     *
     * @param name scheduled name
     * @param cron cron expression
     */
    public void addCronScheduled(String name, String cron) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setCron(cron);

        addScheduled(name, scheduledSource);
    }

    /**
     * Start Scheduled with fixedDelay type
     *
     * @param name scheduled name
     * @param fixedDelay How long after the last execution time point before executing
     * @param initialDelay The delay time of the first execution
     */
    public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedDelay(fixedDelay);
        if (initialDelay != null & amp; & amp; initialDelay. length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null & amp; & amp; initialDelay. length > 1) {
            throw new SuperScheduledException("The delay time of the first execution can only pass in one parameter");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * Start Scheduled with fixedRate type
     *
     * @param name scheduled name
     * @param fixedRate how long to execute after the last execution
     * @param initialDelay The delay time of the first execution
     */
    public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {
        ScheduledSource scheduledSource = new ScheduledSource();
        scheduledSource.setFixedRate(fixedRate);
        if (initialDelay != null & amp; & amp; initialDelay. length == 1) {
            scheduledSource.setInitialDelay(initialDelay[0]);
        } else if (initialDelay != null & amp; & amp; initialDelay. length > 1) {
            throw new SuperScheduledException("The delay time of the first execution can only pass in one parameter");
        }

        addScheduled(name, scheduledSource);
    }

    /**
     * Execute a task manually
     *
     * @param name scheduled name
     */
    public void runScheduled(String name) {
        Runnable runnable = superScheduledConfig. getRunnable(name);
        runnable. run();
    }
}

2. Enhanced interface implementation

The overall idea of the enhancer implementation is consistent with that of SpringAop, and the implementation is not as complicated as Aop

(1) Enhanced interface

@Order(Ordered. HIGHEST_PRECEDENCE)
public interface BaseStrengthen {
    /**
     * Pre-strengthening method
     *
     * @param bean bean instance (or proxied bean)
     * @param method The method object to execute
     * @param args method parameters
     */
    void before(Object bean, Method method, Object[] args);

    /**
     * Post reinforcement method
     * An exception will not be executed
     * Executed after the afterFinally method if no exception occurs
     *
     * @param bean bean instance (or proxied bean)
     * @param method The method object to execute
     * @param args method parameters
     */
    void after(Object bean, Method method, Object[] args);

    /**
     * Exception enhancement method
     *
     * @param bean bean instance (or proxied bean)
     * @param method The method object to execute
     * @param args method parameters
     */
    void exception(Object bean, Method method, Object[] args);

    /**
     * Finally strengthens the method, and it will be executed if an exception occurs
     *
     * @param bean bean instance (or proxied bean)
     * @param method The method object to execute
     * @param args method parameters
     */
    void afterFinally(Object bean, Method method, Object[] args);
}

(2) Proxy abstract class

public abstract class Point {
    /**
     * Timing task name
     */
    private String superScheduledName;

    /**
     * Abstract execution method, implemented using proxy
     * @param runnable timed task executor
     */
    public abstract Object invoke(SuperScheduledRunnable runnable);
    
    /* Ordinary get/sets omitted */
}

(3) Call chain class

public class Chain {
    private List<Point> list;
    private int index = -1;
    /**
     * Index incremented by 1
     */
    public int incIndex() {
        return ++ index;
    }

    /**
     * Index restore
     */
    public void resetIndex() {
        this.index = -1;
    }
}

(4) cglib dynamic proxy implementation

Use the cglib proxy enhancer to proxy all the enhancers into call chain nodes Point

public class RunnableBaseInterceptor implements MethodInterceptor {
    /**
     * Timing task executor
     */
    private SuperScheduledRunnable runnable;
    /**
     * Timed task enhancement class
     */
    private BaseStrengthen strengthen;

    @Override
    public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
        Object result;
        //If the invoke() method is executed
        if ("invoke".equals(method.getName())) {
         // pre-strengthening method
            strength.before(obj, method, args);
            try {
             //Call the invoke() method in the executor
                result = runnable. invoke();
            } catch (Exception e) {
             // exception enhancement method
                strength. exception(obj, method, args);
                throw new SuperScheduledException(strengthen.getClass() + "An error occurred during strengthening execution", e);
            } finally {
             //Finally strengthens the method, and it will be executed if an exception occurs
                strength.afterFinally(obj, method, args);
            }
            // post reinforcement method
            strength after(obj, method, args);

        } else {
         // directly execute the method
            result = methodProxy.invokeSuper(obj, args);
        }
        return result;
    }

    public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {
        this.runnable = runnable;
        if (BaseStrengthen. class. isAssignableFrom(object. getClass())) {
            this.strengthen = (BaseStrengthen) object;
        } else {
            throw new SuperScheduledException(object.getClass() + "object is not of type BaseStrengthen");
        }
    }

    public RunnableBaseInterceptor() {

    }
}

(5) Timing task executor implementation

public class SuperScheduledRunnable {
    /**
     * original method
     */
    private Method method;
    /**
     * The bean where the method is located
     */
    private Object bean;
    /**
     * The call chain of the enhancer
     */
    private Chain chain;


    public Object invoke() {
        Object result;
        //Increment the index by 1
        if (chain.incIndex() == chain.getList().size()) {
            //The enhanced methods in the call chain have all been executed
            try {
                // call chain index initialization
                chain. resetIndex();
                //Enhancers are all executed, execute the original method
                result = method. invoke(bean);
            } catch (IllegalAccessException | InvocationTargetException e) {
                throw new SuperScheduledException(e. getLocalizedMessage());
            }
        } else {
            // Get the method enhancer after being proxied
            Point point = chain.getList().get(chain.getIndex());
            //Execute the enhancer proxy
            //In the enhancer proxy, the method executor will be called back to form a call chain, and the enhancers in the call chain will be run one by one
            result = point.invoke(this);
        }
        return result;
    }
    
    /* Ordinary get/sets omitted */
}

(6) Enhancer proxy logic

Code snippet in the com.gyx.superscheduled.core.SuperScheduledApplicationRunner class

//Create execution controller
SuperScheduledRunnable runnable = new SuperScheduledRunnable();
runnable.setMethod(scheduledSource.getMethod());
runnable.setBean(scheduledSource.getBean());
//The proxy object used to store the enhancer
List<Point> points = new ArrayList<>(baseStrengthenBeanNames. length);
//Loop through the beanName of all enhancers
for (String baseStrengthenBeanName : baseStrengthenBeanNames) {
 // Get the bean object of the enhancer
    Object baseStrengthenBean = applicationContext. getBean(baseStrengthenBeanName);
    //Proxy the enhancer into a Point node
    Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));
    proxy.setSuperScheduledName(name);
    //The proxy object of the enhancer is cached in the list
    points. add(proxy);
}
// Generate a call chain from the collection of enhancer proxy instances
//Set the call chain in the execution controller
runnable.setChain(new Chain(points));

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Java skill treeHomepageOverview 118337 people are studying systematically