Rely on quartz to make a reliable timing task system

Preface

Not long ago, I changed a quartz single-point system to a system with multiple nodes. The solution used is also the solution of quartz scheduler (mysql), because it is really simple and easy to use. This article mainly records some function realization and pitfalls. The focus is on talking about Talk about how to write tasks, how to recover from interruptions, and avoid problems with tasks caused by restarts.

Issue Log

  • The pit of job serialization (Spring MethodInvoker)
    In the project, MethodInvokingJobDetailFactoryBean is used to configure a lot of nearly a hundred tasks in the form of xml, but after the distributed form of quartz is started, a NotSerializableException is reported. Why? Look at the source code, the methodInvoker is placed in the jobDataMap to point to this, which is a bean in the spring context, and it must not be serialized, so it cannot be dropped into the database. Moreover, the annotation in MethodInvokingJobDetailFactoryBean also clearly states that persistence is not supported. If necessary, you can customize and develop one yourself.
//MethodInvokingJobDetailFactoryBean#afterPropertiesSet
@Override
public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
prepare();

// Use specific name if given, else fall back to bean name.
String name = (this.name != null ? this.name : this.beanName);

// Consider the concurrent flag to choose between stateful and stateless job.
Class<? extends Job> jobClass = (this.concurrent? MethodInvokingJob.class : StatefulMethodInvokingJob.class);

// Build JobDetail instance.
JobDetailImpl jdi = new JobDetailImpl();
jdi. setName(name != null ? name : toString());
jdi.setGroup(this.group);
//jobClass is actually the inner class MethodInvokingJob or StatefulMethodInvokingJob
jdi. setJobClass(jobClass);
jdi. setDurability(true);
//Put a bean, internal class MethodInvokingJob call
jdi.getJobDataMap().put("methodInvoker", this);
this.jobDetail = jdi;
postProcessJobDetail(this.jobDetail);
}
public static class MethodInvokingJob extends QuartzJobBean {...}
@DisallowConcurrentExecution
public static class StatefulMethodInvokingJob extends MethodInvokingJob {
}

Then write one according to the gourd painting, adding interrupt recovery and proxy bean processing.

public class CustomizedMethodInvokingJobDetailFactoryBean extends ArgumentConvertingMethodInvoker
        implements FactoryBean<JobDetail>, BeanNameAware, BeanClassLoaderAware, BeanFactoryAware, InitializingBean, ApplicationContextAware {
//Record the proxy bean corresponding to the class name
    private static final ConcurrentHashMap<String, Object> realClassName2ProxyObject = new ConcurrentHashMap<>();

    private static final Logger LOG = LoggerFactory.getLogger(CustomizedMethodInvokingJobDetailFactoryBean.class);

    @Nullable
    private String name;

    private String group = Scheduler. DEFAULT_GROUP;

    private boolean concurrent = true;

    @Nullable
    private String targetBeanName;

    @Nullable
    private String beanName;

    @Nullable
    private ClassLoader beanClassLoader = ClassUtils. getDefaultClassLoader();

    @Nullable
    private BeanFactory beanFactory;

    @Nullable
    private JobDetail jobDetail;

    /**
     * Whether to resume after being interrupted
     * Interruption is determined according to the records in the database table. If you use this attribute, please be idempotent
     */
    private boolean requestsRecovery = false;

    private static ApplicationContext applicationContext;

    public void setName(String name) {
        this.name = name;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public void setRequestsRecovery(boolean requestsRecovery) {
        this.requestsRecovery = requestsRecovery;
    }

    /**
     * Whether tasks can be executed concurrently
     */
    public void setConcurrent(boolean concurrent) {
        this.concurrent = concurrent;
    }

    public void setTargetBeanName(String targetBeanName) {
        this. targetBeanName = targetBeanName;
    }

    @Override
    public void setBeanName(String beanName) {
        this.beanName = beanName;
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.beanClassLoader = classLoader;
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        this. beanFactory = beanFactory;
    }

    @Override
    protected Class<?> resolveClassName(String className) throws ClassNotFoundException {
        return ClassUtils.forName(className, this.beanClassLoader);
    }


    @Override
    public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
        prepare();
        // Use specific name if given, else fall back to bean name.
        String name = (this.name != null ? this.name : this.beanName);
        // Consider the concurrent flag to choose between stateful and stateless job.
        Class<? extends Job> jobClass = (this. concurrent ? BeanInvokingJob. class : StatefulBeanInvokingJob. class);
        // Build JobDetail instance.
        JobDetailImpl jdi = new JobDetailImpl();
        jdi. setName(name != null ? name : toString());
        jdi.setGroup(this.group);
        jdi. setJobClass(jobClass);
        jdi. setDurability(true);
        jdi.setRequestsRecovery(this.requestsRecovery);
        try {
            LOG.info("targetObject class name: {}", this.getTargetObject().getClass().getName());
            Object realObject = AopTargetUtils.getTarget(this.getTargetObject());
            jdi.getJobDataMap().put("targetClass", realObject.getClass().getName());
        } catch (Exception e) {
            LOG.error("Error getting real class{}:{}", name, e);
            jdi.getJobDataMap().put("targetClass", ClassUtils.getUserClass(this.getTargetObject()).getName());
        }
        String targetClass = jdi. getJobDataMap(). getString("targetClass");
        //Retain the relationship between the real class name and the bean
        if (realClassName2ProxyObject. contains(targetClass)) {
            LOG.error("Target class: {} has multiple beans/proxy beans", targetClass);
        } else {
            LOG.info("record targetClass:{} targetObject:{}", targetClass, this.getTargetObject());
            realClassName2ProxyObject.put(targetClass, this.getTargetObject());
        }
        jdi.getJobDataMap().put("targetMethod", this.getTargetMethod());
        this.jobDetail = jdi;
        postProcessJobDetail(this.jobDetail);
    }

    protected void postProcessJobDetail(JobDetail jobDetail) {
    }

    @Override
    public Class<?> getTargetClass() {
        Class<?> targetClass = super. getTargetClass();
        if (targetClass == null & amp; & amp; this. targetBeanName != null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set when using 'targetBeanName'");
            targetClass = this.beanFactory.getType(this.targetBeanName);
        }
        return targetClass;
    }

    @Override
    public Object getTargetObject() {
        Object targetObject = super. getTargetObject();
        if (targetObject == null & amp; & amp; this. targetBeanName != null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set when using 'targetBeanName'");
            targetObject = this.beanFactory.getBean(this.targetBeanName);
        }
        return targetObject;
    }


    @Override
    @Nullable
    public JobDetail getObject() {
        return this. jobDetail;
    }

    @Override
    public Class<? extends JobDetail> getObjectType() {
        return (this.jobDetail != null ? this.jobDetail.getClass() : JobDetail.class);
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        applicationContext = context;
    }


    public static class BeanInvokingJob implements Job {
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            try {
                LOG.info("start");
                String targetClass = context.getMergedJobDataMap().getString("targetClass");
                Class clazz = Class. forName(targetClass);

                String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
                if (targetMethod == null) {
                    throw new JobExecutionException("targetMethod cannot be null.", false);
                }
                Object argumentsObject = context.getMergedJobDataMap().get("arguments");
                Object[] arguments = (argumentsObject instanceof String) ? null : (Object[]) argumentsObject;

                Object bean = applicationContext. getBean(clazz);
                if (realClassName2ProxyObject. contains(targetClass)) {
                    //Get proxy class
                    bean = realClassName2ProxyObject.get(targetClass);
                }
                MethodInvoker beanMethod = new MethodInvoker();
                beanMethod.setTargetObject(bean);
                beanMethod.setTargetMethod(targetMethod);
                beanMethod.setArguments(arguments);
                beanMethod. prepare();
                LOG.info("Invoking Bean: {} ; Method: {}", clazz, targetMethod);
                beanMethod. invoke();
            } catch (JobExecutionException e) {
                throw e;
            } catch (Exception e) {
                throw new JobExecutionException(e);
            } finally {
                LOG.info("end");
            }
        }
    }

    @DisallowConcurrentExecution
    public static class StatefulBeanInvokingJob extends BeanInvokingJob {}

}
  • dynamic proxy problem
    If the task in the form of method invoke has aspects for the method, such as logs, transactions, etc., you need to call the proxy bean. The above custom class is compatible with the two proxy methods
  • misfire problem (missed execution)
    There are many misfire articles that are very good. In fact, it is missing execution due to various reasons, and the compensation strategy is divided into SimpleTrigger and CronTrigger. In the case of CronTringer, if the task does not allow concurrency, set it to MISFIRE_INSTRUCTION_DO_NOTHING. Pay attention to configure org.quartz.jobStore.misfireThreshold, which is used to limit how long it takes to miss a task.
  • Safe restart (scheduler shutdown)
    This part is mainly discussed below. Some problems were found in the test. Although the quartz scheduler has waitForJobsToCompleteOnShutdown, which means waiting for the task to complete when it stops, there seems to be a problem with the integration with spring, which makes it unable to wait for the task to complete. If you use a custom thread pool, there will be another problem: after the task is completed, the database needs to be modified, the scheduler has stopped, java.lang.IllegalStateException: JobStore is shutdown . I adjusted it later, it’s ok, using quartz’s default thread pool and using spring’s SchedulerFactoryBean can meet the task execution and then stop. The reason for the previous problem is that the quartzScheduler was registered in the spring. When the context was destroyed, the scheduler was destroyed twice. The details will be analyzed again.
  • Tasks prohibit concurrent execution
    Use the @DisallowConcurrentExecution annotation, which is included in the custom class above. It should be noted that task concurrency prohibition is valid in a distributed environment:
  1. Effective even in the presence of misfire compensation
  2. It works even if there is a manual trigger task, provided that it is manually triggered using the quartz API
  3. Works even with task interruption recovery
  • Task failure recovery
    The above custom class has the requestsRecovery attribute, but it should be noted that the task throwing an exception is also considered to be completed normally, and the failure recovery is actually implemented according to the records in the database table qrtz_fired_triggers.
  • Manually trigger tasks
    Use scheduler.triggerJob(jobKey); to trigger a schedule, but it may not be executed immediately.

Focus: Some ideas for writing timed tasks, pure discussion

  • Short-duration tasks + high-frequency scheduling VS Long-duration tasks + low-frequency scheduling
    In view of quartz’s thread pool model, a long-running task is an unfriendly approach, and long-running tasks are not easy to handle in terms of interrupt recovery, safe exit, etc. Short-running tasks may be a better approach .
  • Task exception handling
    Almost all exceptions should be caught and handled in the task, because it does not know what to do when thrown to the task scheduling platform.
  • How to Safely Shut Down and Respond to Outages
  1. If the time of all tasks is short, it can be set to shutdown after completing the task, and the spring SchedulerFactoryBean sets the attribute waitForJobsToCompleteOnShutdown to true, and uses quartz’s own thread pool.
 @Override
public void destroy() throws SchedulerException {
if (this. scheduler != null) {
logger.info("Shutting down Quartz Scheduler");
this.scheduler.shutdown(this.waitForJobsToCompleteOnShutdown);
}
}

public void shutdown(boolean waitForJobsToComplete) {
//...
schedThread.halt(waitForJobsToComplete);
        
        notifySchedulerListenersShuttingdown();
        
        if( (resources.isInterruptJobsOnShutdown() & amp; & amp; !waitForJobsToComplete) ||
                (resources.isInterruptJobsOnShutdownWithWait() & amp; & amp; waitForJobsToComplete)) {
            List<JobExecutionContext> jobs = getCurrentlyExecutingJobs();
            for(JobExecutionContext job: jobs) {
                if(job. getJobInstance() instanceof InterruptableJob)
                    try {
                        ((InterruptableJob)job. getJobInstance()). interrupt();
                    } catch (Throwable e) {
                        // do nothing, this was just a courtesy effort
                        getLog().warn("Encountered error when interrupting job {} during shutdown: {}", job.getJobDetail().getKey(), e);
                    }
            }
        }
        //If you customize the thread pool, do nothing here
        resources.getThreadPool().shutdown(waitForJobsToComplete);
        
        closed = true;
//...
}
  1. If there are some tasks that run for a long time, it is better not to set the waitForJobsToCompleteOnShutdown attribute, to be idempotent or to record the progress of task execution, because if the thread that is running the task does not stop at this time, the next step is to kill -9. This can cause some unpredictable problems.
  2. You can implement interruptible tasks, and then wait for the task to complete when setting shutdown, so that you can take into account various forms of tasks and avoid online accidents.
  • quartz listener
    Whether it is job monitoring or trigger monitoring, catch all exceptions.
  • Use a custom thread pool?
    There may be problems with the interaction between the custom thread pool and quartz. For example, the custom thread pool waits for all tasks to be completed. When I modify the db, I find that the job store of quartz has stopped. Tears…
  • Divide the boundaries, scheduling belongs to scheduling, tasks belong to tasks, and be idempotent