Wednesday 17 June 2015

Integrating Quartz Scheduler with Java webapp on multiple nodes

Recently I needed to schedule some background jobs in a legacy web app that uses Java. I found Quartz to be very useful to accomplish my task.

Contents
  • Add dependencies
  • Logging
  • Create Job Class
  • Create scheduler on the start up using org.quartz.ee.servlet.QuartzInitializerListener
  • Create JobDetails
  • Create Triggers.
  • Put it all together
  • Clustering

Add dependencies

    
      org.quartz-scheduler
      quartz
      2.2.1
    
    
      org.quartz-scheduler
      quartz-jobs
      2.2.1
    
Enable logging in order to follow what is happening through the logs.
log4j.rootLogger=WARN, ROOT
log4j.appender.ROOT=org.apache.log4j.RollingFileAppender
log4j.logger.org.quartz=DEBUG
log4j.appender.ROOT.File=/tmp/quartz.log

Creating Job Class

package com.coffeebeans.quartz.job;

import com.coffeebeans.quartz.utils.Constants;
import com.coffeebeans.quartz.utils.DateUtils;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.Date;

/**
 * Created by muhamadto on 6/9/15.
 */
public class QuartzJob implements Job {
    private static final Logger LOGGER =  LoggerFactory.getLogger(QuartzJob.class);
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail = jobExecutionContext.getJobDetail();
        try {
            LOGGER.debug(String.format("Scheduled Job (%s) started - %s", jobDetail.getKey(), DateUtils.formatDate(Constants.DATE_FORMAT, new Date())));
        } catch (ParseException e) {
            e.printStackTrace();
            LOGGER.error("Could not execute job: {}, Exception details: {}", jobDetail.getKey(), e);
            throw new JobExecutionException(e);
        }
    }
}

Create scheduler

You can get scheduler and start it using 
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start(); 
However, a better way to accomplish the same task is to start/shutdown the scheduler with the application server. Paste the following snippet into your web.xml
    
    
        quartz:config-file
        quartz.properties
    
    
        quartz:shutdown-on-unload
        true
    
    
        quartz:wait-on-shutdown
        true
    
    
        quartz:start-on-load
        true
    

    
        
            org.quartz.ee.servlet.QuartzInitializerListener
        
    
  • Set quartz:start-on-load=true to specify that you want the scheduler.start() method to be called when the listener is first loaded. 
  • Set quartz:shutdown-on-unload=true to specify that you want scheduler.shutdown() to be called when server is being shutdown.
  • Set quartz:wait-on-shutdown to true to execute scheduler.shutdown(true). This will cause the scheduler to wait for existing jobs to complete. 
  • If you skipped quartz:config-file, the scheduler will be initialized with the default config.
Now get an instance of the scheduler. StdSchedulerFactory instance is stored into the ServletContextYou can gain access to the factory from a ServletContext.
try {
    StdSchedulerFactory factory = (StdSchedulerFactory) config.getServletContext()
        .getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY);
    scheduler = factory.getScheduler();
} catch (SchedulerException e) {
    throw new ServletException();        
}

JobDetail

JobDetail is a job instance definition with its own set of properties. For example, a CleanUpJob, could have instance to clean files from tmp directory, another to clean cache, and so on. Use JobMetaData to pass parameters to your job.
JobKey jobKey = JobKey.jobKey(jobName, groupName);
JobDetail job =newJob(QuartzJob.class).withIdentity(jobKey).requestRecovery().build();

Create a trigger 

There are two types of triggers:
SimpleTrigger, used to schedule a job that will be executed only one time at a specific time, or at a specific time then repeats at a certain interval.
Trigger simpleTrigger = newTrigger()
        .withIdentity(triggerName, groupName)
        .withSchedule(simpleSchedule()
                .withRepeatCount(repeatCount)
                .withIntervalInSeconds(repeatIntervalInSeconds))
        .startAt(startTime)
        .forJob(job)
        .build();

CronTrigger, used to specify times to run a job in a manner similar to Unix cron expression.
CronExpression cexp = new CronExpression(“0 0 11 * * ?”);
Trigger cronTrigger = newTrigger()
        .withIdentity(triggerName, groupName)
        .withSchedule(cronSchedule(cronExpression))
        .forJob(job)
        .build();

Note: In quartz a trigger can fire only one job while a job can be associated with multiple triggers

Put it all together 

Now create a servlet to serve as a client in which we will call and use the job
package com.coffeebeans.quartz.servlet;

import com.coffeebeans.quartz.job.QuartzJob;
import com.coffeebeans.quartz.utils.NumberUtils;
import org.quartz.*;
import org.quartz.ee.servlet.QuartzInitializerListener;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Date;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

/**
 * Created by muhamadto on 6/9/15.
 */
public class QuartzServlet extends HttpServlet {
    private static final Logger LOGGER = LoggerFactory.getLogger(QuartzServlet.class);

    private Scheduler scheduler;

    @Override
    public void init(ServletConfig config) throws ServletException {
        try {
            StdSchedulerFactory factory = (StdSchedulerFactory) config.getServletContext()
                    .getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY);

            scheduler = factory.getScheduler();
        } catch (SchedulerException e) {
            throw new ServletException();
        }
        super.init(config);
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        try {
            String jobName = req.getParameter("jobName");
            String groupName = req.getParameter("groupName");

            String mills = req.getParameter("mills");

            Date startTimeForSimpleTrigger = null;
            if (!"".equals(mills) && NumberUtils.isNumeric(mills)) { // isNumeric copied from commons-lang StringUtils
                startTimeForSimpleTrigger = new Date(Long.parseLong(mills));
            }

            JobKey jobKey = JobKey.jobKey(jobName, groupName);
            JobDetail job = newJob(QuartzJob.class).withIdentity(jobKey).requestRecovery().build();

            String triggerName = String.format("simple-%s", jobKey);
            Trigger simpleTrigger = newTrigger()
                    .withIdentity(triggerName, groupName)
                    .withSchedule(simpleSchedule()
                            .withRepeatCount(0)
                            .withIntervalInSeconds(0))
                    .startAt(startTimeForSimpleTrigger)
                    .forJob(job)
                    .build();

            boolean jobExists = scheduler.checkExists(job.getKey());
            if (!jobExists) {
                scheduler.scheduleJob(job, simpleTrigger);
            } else {
                scheduler.rescheduleJob(simpleTrigger.getKey(), simpleTrigger);
            }
        } catch (Exception e) {
            LOGGER.error("Job was not scheduled", e);
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, String.format("Failed! Job was not saved. %s", e.getMessage()));
        }
    }

    @Override
    protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }
}
Map the servlet in web.xml
    
        QuartzServlet
        com.coffeebeans.quartz.servlet.QuartzServlet
    
    
        QuartzServlet
        /schedule
    
  • rescheduleJob() will cause the scheduler to remove the old trigger with the given key, and put the new one in its place.
  • Calling requestRecovery() will cause the scheduler to re-executea job in case of server hard-shutdown.
Quick things to notice here
  • For the recover to work we will need to use datastore. 
  • Set the DB properties in quartz.properties and find the db script for mysql in /path/to/quartz-2.2.1/docs/dbTables/tables_mysql.sql

Clustering, and Datastore Configuration

In /path/to/tomcat/context.xml
  
Then add quartz.properties
#============================================================================
# Configure Main Scheduler Properties
#============================================================================

org.quartz.scheduler.instanceName=CoffeeBeansClusteredScheduler
org.quartz.scheduler.instanceId=AUTO

#============================================================================
# Configure ThreadPool
#============================================================================

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=25
org.quartz.threadPool.threadPriority=5

#============================================================================
# Configure JobStore
#============================================================================

org.quartz.jobStore.misfireThreshold=60000

org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.lockHandler.class=org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore
org.quartz.jobStore.lockHandler.updateLockRowSQL=UPDATE {0}LOCKS SET LOCK_NAME = LOCK_NAME WHERE LOCK_NAME = ?
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000

#============================================================================
# Configure Datasources
#============================================================================

org.quartz.dataSource.myDS.jndiURL=java:comp/env/jdbc/quartz
#============================================================================
# Logging
#============================================================================

org.quartz.plugin.triggerHistory.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.triggerHistory.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.triggerHistory.triggerFiredMessage=Trigger [{1}.{0}] fired job [{6}.{5}] scheduled at: {2, date, dd-MM-yyyy HH:mm:ss.SSS}, next scheduled at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.triggerHistory.triggerCompleteMessage=Trigger [{1}.{0}] completed firing job [{6}.{5}] with resulting trigger instruction code: {9}. Next scheduled at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.triggerHistory.triggerMisfiredMessage=Trigger [{1}.{0}] misfired job [{6}.{5}]. Should have fired at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.jobHistory.class=org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.jobHistory.jobToBeFiredMessage=Job [{1}.{0}] to be fired by trigger [{4}.{3}], re-fire: {7}
org.quartz.plugin.jobHistory.jobSuccessMessage=Job [{1}.{0}] execution complete and reports: {8}
org.quartz.plugin.jobHistory.jobFailedMessage=Job [{1}.{0}] execution failed with exception: {8}
org.quartz.plugin.jobHistory.jobWasVetoedMessage=Job [{1}.{0}] was vetoed. It was to be fired by trigger [{4}.{3}] at: {2, date, dd-MM-yyyy HH:mm:ss.SSS}
  • Set org.quartz.jobStore.isClustered=true to indicate that you want to activate cluster mode. 
  • Make sure org.quartz.scheduler.instanceId=AUTO; otherwise you will need to have multiple configuration files.