Showing posts with label quartz scheduler. Show all posts
Showing posts with label quartz scheduler. Show all posts

Wednesday, 17 June 2015

Integrating Quartz Scheduler with Java webapp on multiple nodes

Recently I needed to schedule some background jobs in a legacy web app that uses Java. I found Quartz to be very useful to accomplish my task.

Contents
  • Add dependencies
  • Logging
  • Create Job Class
  • Create scheduler on the start up using org.quartz.ee.servlet.QuartzInitializerListener
  • Create JobDetails
  • Create Triggers.
  • Put it all together
  • Clustering

Add dependencies

    
      org.quartz-scheduler
      quartz
      2.2.1
    
    
      org.quartz-scheduler
      quartz-jobs
      2.2.1
    
Enable logging in order to follow what is happening through the logs.
log4j.rootLogger=WARN, ROOT
log4j.appender.ROOT=org.apache.log4j.RollingFileAppender
log4j.logger.org.quartz=DEBUG
log4j.appender.ROOT.File=/tmp/quartz.log

Creating Job Class

package com.coffeebeans.quartz.job;

import com.coffeebeans.quartz.utils.Constants;
import com.coffeebeans.quartz.utils.DateUtils;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.Date;

/**
 * Created by muhamadto on 6/9/15.
 */
public class QuartzJob implements Job {
    private static final Logger LOGGER =  LoggerFactory.getLogger(QuartzJob.class);
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail = jobExecutionContext.getJobDetail();
        try {
            LOGGER.debug(String.format("Scheduled Job (%s) started - %s", jobDetail.getKey(), DateUtils.formatDate(Constants.DATE_FORMAT, new Date())));
        } catch (ParseException e) {
            e.printStackTrace();
            LOGGER.error("Could not execute job: {}, Exception details: {}", jobDetail.getKey(), e);
            throw new JobExecutionException(e);
        }
    }
}

Create scheduler

You can get scheduler and start it using 
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start(); 
However, a better way to accomplish the same task is to start/shutdown the scheduler with the application server. Paste the following snippet into your web.xml
    
    
        quartz:config-file
        quartz.properties
    
    
        quartz:shutdown-on-unload
        true
    
    
        quartz:wait-on-shutdown
        true
    
    
        quartz:start-on-load
        true
    

    
        
            org.quartz.ee.servlet.QuartzInitializerListener
        
    
  • Set quartz:start-on-load=true to specify that you want the scheduler.start() method to be called when the listener is first loaded. 
  • Set quartz:shutdown-on-unload=true to specify that you want scheduler.shutdown() to be called when server is being shutdown.
  • Set quartz:wait-on-shutdown to true to execute scheduler.shutdown(true). This will cause the scheduler to wait for existing jobs to complete. 
  • If you skipped quartz:config-file, the scheduler will be initialized with the default config.
Now get an instance of the scheduler. StdSchedulerFactory instance is stored into the ServletContextYou can gain access to the factory from a ServletContext.
try {
    StdSchedulerFactory factory = (StdSchedulerFactory) config.getServletContext()
        .getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY);
    scheduler = factory.getScheduler();
} catch (SchedulerException e) {
    throw new ServletException();        
}

JobDetail

JobDetail is a job instance definition with its own set of properties. For example, a CleanUpJob, could have instance to clean files from tmp directory, another to clean cache, and so on. Use JobMetaData to pass parameters to your job.
JobKey jobKey = JobKey.jobKey(jobName, groupName);
JobDetail job =newJob(QuartzJob.class).withIdentity(jobKey).requestRecovery().build();

Create a trigger 

There are two types of triggers:
SimpleTrigger, used to schedule a job that will be executed only one time at a specific time, or at a specific time then repeats at a certain interval.
Trigger simpleTrigger = newTrigger()
        .withIdentity(triggerName, groupName)
        .withSchedule(simpleSchedule()
                .withRepeatCount(repeatCount)
                .withIntervalInSeconds(repeatIntervalInSeconds))
        .startAt(startTime)
        .forJob(job)
        .build();

CronTrigger, used to specify times to run a job in a manner similar to Unix cron expression.
CronExpression cexp = new CronExpression(“0 0 11 * * ?”);
Trigger cronTrigger = newTrigger()
        .withIdentity(triggerName, groupName)
        .withSchedule(cronSchedule(cronExpression))
        .forJob(job)
        .build();

Note: In quartz a trigger can fire only one job while a job can be associated with multiple triggers

Put it all together 

Now create a servlet to serve as a client in which we will call and use the job
package com.coffeebeans.quartz.servlet;

import com.coffeebeans.quartz.job.QuartzJob;
import com.coffeebeans.quartz.utils.NumberUtils;
import org.quartz.*;
import org.quartz.ee.servlet.QuartzInitializerListener;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Date;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

/**
 * Created by muhamadto on 6/9/15.
 */
public class QuartzServlet extends HttpServlet {
    private static final Logger LOGGER = LoggerFactory.getLogger(QuartzServlet.class);

    private Scheduler scheduler;

    @Override
    public void init(ServletConfig config) throws ServletException {
        try {
            StdSchedulerFactory factory = (StdSchedulerFactory) config.getServletContext()
                    .getAttribute(QuartzInitializerListener.QUARTZ_FACTORY_KEY);

            scheduler = factory.getScheduler();
        } catch (SchedulerException e) {
            throw new ServletException();
        }
        super.init(config);
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        try {
            String jobName = req.getParameter("jobName");
            String groupName = req.getParameter("groupName");

            String mills = req.getParameter("mills");

            Date startTimeForSimpleTrigger = null;
            if (!"".equals(mills) && NumberUtils.isNumeric(mills)) { // isNumeric copied from commons-lang StringUtils
                startTimeForSimpleTrigger = new Date(Long.parseLong(mills));
            }

            JobKey jobKey = JobKey.jobKey(jobName, groupName);
            JobDetail job = newJob(QuartzJob.class).withIdentity(jobKey).requestRecovery().build();

            String triggerName = String.format("simple-%s", jobKey);
            Trigger simpleTrigger = newTrigger()
                    .withIdentity(triggerName, groupName)
                    .withSchedule(simpleSchedule()
                            .withRepeatCount(0)
                            .withIntervalInSeconds(0))
                    .startAt(startTimeForSimpleTrigger)
                    .forJob(job)
                    .build();

            boolean jobExists = scheduler.checkExists(job.getKey());
            if (!jobExists) {
                scheduler.scheduleJob(job, simpleTrigger);
            } else {
                scheduler.rescheduleJob(simpleTrigger.getKey(), simpleTrigger);
            }
        } catch (Exception e) {
            LOGGER.error("Job was not scheduled", e);
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, String.format("Failed! Job was not saved. %s", e.getMessage()));
        }
    }

    @Override
    protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }
}
Map the servlet in web.xml
    
        QuartzServlet
        com.coffeebeans.quartz.servlet.QuartzServlet
    
    
        QuartzServlet
        /schedule
    
  • rescheduleJob() will cause the scheduler to remove the old trigger with the given key, and put the new one in its place.
  • Calling requestRecovery() will cause the scheduler to re-executea job in case of server hard-shutdown.
Quick things to notice here
  • For the recover to work we will need to use datastore. 
  • Set the DB properties in quartz.properties and find the db script for mysql in /path/to/quartz-2.2.1/docs/dbTables/tables_mysql.sql

Clustering, and Datastore Configuration

In /path/to/tomcat/context.xml
  
Then add quartz.properties
#============================================================================
# Configure Main Scheduler Properties
#============================================================================

org.quartz.scheduler.instanceName=CoffeeBeansClusteredScheduler
org.quartz.scheduler.instanceId=AUTO

#============================================================================
# Configure ThreadPool
#============================================================================

org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=25
org.quartz.threadPool.threadPriority=5

#============================================================================
# Configure JobStore
#============================================================================

org.quartz.jobStore.misfireThreshold=60000

org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.lockHandler.class=org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore
org.quartz.jobStore.lockHandler.updateLockRowSQL=UPDATE {0}LOCKS SET LOCK_NAME = LOCK_NAME WHERE LOCK_NAME = ?
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000

#============================================================================
# Configure Datasources
#============================================================================

org.quartz.dataSource.myDS.jndiURL=java:comp/env/jdbc/quartz
#============================================================================
# Logging
#============================================================================

org.quartz.plugin.triggerHistory.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.triggerHistory.class=org.quartz.plugins.history.LoggingTriggerHistoryPlugin
org.quartz.plugin.triggerHistory.triggerFiredMessage=Trigger [{1}.{0}] fired job [{6}.{5}] scheduled at: {2, date, dd-MM-yyyy HH:mm:ss.SSS}, next scheduled at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.triggerHistory.triggerCompleteMessage=Trigger [{1}.{0}] completed firing job [{6}.{5}] with resulting trigger instruction code: {9}. Next scheduled at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.triggerHistory.triggerMisfiredMessage=Trigger [{1}.{0}] misfired job [{6}.{5}]. Should have fired at: {3, date, dd-MM-yyyy HH:mm:ss.SSS}
org.quartz.plugin.jobHistory.class=org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.jobHistory.jobToBeFiredMessage=Job [{1}.{0}] to be fired by trigger [{4}.{3}], re-fire: {7}
org.quartz.plugin.jobHistory.jobSuccessMessage=Job [{1}.{0}] execution complete and reports: {8}
org.quartz.plugin.jobHistory.jobFailedMessage=Job [{1}.{0}] execution failed with exception: {8}
org.quartz.plugin.jobHistory.jobWasVetoedMessage=Job [{1}.{0}] was vetoed. It was to be fired by trigger [{4}.{3}] at: {2, date, dd-MM-yyyy HH:mm:ss.SSS}
  • Set org.quartz.jobStore.isClustered=true to indicate that you want to activate cluster mode. 
  • Make sure org.quartz.scheduler.instanceId=AUTO; otherwise you will need to have multiple configuration files.