Big Data/Analytics Zone is brought to you in partnership with:

I am a software architect and loud-mouth on the design and development of enterprise softwares. Also I am a visionary who loves Big Data and Start-Up Chris is a DZone MVB and is not an employee of DZone and has posted 11 posts at DZone. You can read more from them at their website. View Full User Profile

An Enterprise Use Case for Spring Batch

12.26.2012
| 9134 views |
  • submit to reddit

In our firm, we had a use case to process some main frame generated files. In this post I would like to describe how we implemented the use case by utilizing Spring Batch.

Use Case: Mainframe will generate some files (which we call it kuba) and it will put it in some network drive which is accessible for our application to read/rename/delete.

To have a glance view it would be great if we have a look into our XML configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
						http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

    <job-repository id="jobRepository"
                    data-source="appDataSource"
                    transaction-manager="jtaTransactionManager"
                    xmlns="http://www.springframework.org/schema/batch"
            />
    <job id="userStoreProvisioning" xmlns="http://www.springframework.org/schema/batch" job-repository="jobRepository" restartable="true">
        <step id="readWrite" next="clean">
            <tasklet transaction-manager="jtaTransactionManager">
                <chunk
                        reader="userStoreMultiResourceReader"
                        processor="userStoreProcessors"
                        writer="userStoreItemWriter"
                        commit-interval="1">
                    <streams>
                        <stream ref="userStoreMultiResourceReader" />
                    </streams>
                </chunk>
            </tasklet>
        </step>
        <step id="clean">
            <tasklet ref="userStoreBatchCleanupTasklet" transaction-manager="jtaTransactionManager"/>
        </step>
        <listeners>
            <listener ref="userStoreProvisionListener"/>
        </listeners>
    </job>

    <bean id="userStoreBatchCleanupTasklet" class="UserStoreBatchCleanupTasklet"
          p:directoryResource="${userStoreProvisioning.multiResourceReader.directory}"/>

    <bean id="userStoreProvisionListener" class="UserStoreProvisionListener" />

    <bean id="userStoreItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
        <property name="delegates">
            <list>
                <bean class="UserStoreJdbcItemWriter">
                    <property name="jdbcTemplate" ref="userStoreDataSource"/>
                </bean>
                <bean class="UserStoreJmsItemWriter"/>
            </list>
        </property>
    </bean>

    <bean id="userStoreProcessors" class="org.springframework.batch.item.support.CompositeItemProcessor">
        <property name="delegates">
            <list>
                <bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
                    <property name="validator">
                        <bean class="MainframeBeanWrapperValidator"/>
                    </property>
                </bean>
            </list>
        </property>
    </bean>

    <bean id="userStoreMultiResourceReader" class="MultiResourceItemReader" scope="step" >
        <property name="resourcePredicate">
            <bean class="ResourceFilter" />
        </property>
        <property name="resources"
                  value="${userStoreProvisioning.multiResourceReader.directory}${userStoreProvisioning.multiResourceReader.resources}"/>
        <property name="comparator">
            <bean class="MultiResourceReaderComparator"/>
        </property>
        <property name="delegate" ref="userStoreItemReader"/>
        <property name="saveState" value="true" />
    </bean>


    <bean id="userStoreItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="prototype">
        <!-- Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string. -->
        <property name="recordSeparatorPolicy">
            <bean class="org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy"/>
        </property>
        <property name="saveState" value="true" />
        <!-- Interface for mapping lines (strings) to domain objects typically used to map lines read from a file to domain objects on a per line basis. -->
        <property name="lineMapper">
            <bean id="userStoreLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

                <!-- A LineTokenizer implementation that splits the input String on a configurable delimiter. -->
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="delimiter" value=";"/>
                        <property name="names" value="userId,actionType,userType"/>
                    </bean>
                </property>

                <!--Implementation based on bean property paths-->
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="mainframeBeanWrapper"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="mainframeBeanWrapper" class="MainframeBeanWrapper" scope="prototype"/>

</beans>

There are few items which need more description in above XML file, which are:
Item Description
jobRepository
the job repository is a key element of the Spring Batch infrastructure because it provides some metadata about batch processing.
userStoreProvisioning
This is the main job of the our use case.
steps There are two steps involve in this use case. The first step (readWrite) is in charge of reading from file and persist into database as well as sending message to an appropriate JMS. The last step is clean which is in charge of deleting the processed file.
 userStoreMultiResourceReader  This is our class which we have extended from org.springframework.batch.item.file.MultiResourceItemReader; basically we have added a feature to Spring Batch called: resource filter. With filter concept we could easily manage what kind of file do we want to process first. For instance, we want to process the failed ones first, and then other files.

Inside this class we are using a comparator to handle the sorting for files.
 userStoreItemReader  Basically this is a pre-built feature from Batch that will help us to read values of the file. The XML itself is clearly self-describing so the only point which deserve to be voice out is: saveState; this is the element that will keep the state of readings, and in case of failure fill process when Batch wants to retry, it will start from the last saved state.
 userStoreItemWriter  We are having a use case to write the read rows of the file into database and JMS. Hence we are achieving this goal by consuming CompositeItemWriter. 

 UserStoreJdbcItemWriter  It is a custom ItemWriter for JDBC. Because we had a logic that sometimes we have to insert, some other times we had to update.
 userStoreBatchCleanupTasklet  It is a custom tasklet to delete the processed files.
In here you can find some of the source code of aforementioned items.

MultiResourceItemReader

/**
 * This class is basically identical with {@link org.springframework.batch.item.file.MultiResourceItemReader}. This
 * class has added a feature on filtering the resources before processing them.
 *
 * @author Chris Shayan
 */
public class MultiResourceItemReader<T> extends org.springframework.batch.item.file.MultiResourceItemReader<T> {
    ResourcePredicate<Resource> resourcePredicate;

    @Override
    public void setResources(Resource[] resources) {
        Validate.notNull(resources, "The resources must not be null");

        List<Resource> resourcesList = new ArrayList<Resource>(Arrays.asList(resources));
        if (resourcePredicate.isPredicateActive()) {
            for (Iterator<Resource> it = resourcesList.iterator(); it.hasNext(); ) {
                if (resourcePredicate.evaluate(it.next()) == false) {
                    it.remove();
                }
            }
        }

        super.setResources(resourcesList.toArray(new Resource[resourcesList.size()]));
    }

    public void setResourcePredicate(ResourcePredicate<Resource> resourcePredicate) {
        Validate.notNull(resourcePredicate);

        this.resourcePredicate = resourcePredicate;
    }
}

ResourceFilter

/**
 * This class will ensure what resource files should be processed in this cycle of execution.
 * <p><b>Warning:</b> This class should not be singleton at all.</p>
 *
 * @author Chris Shayan
 */
public class ResourceFilter implements ResourcePredicate<Resource> {
    @Autowired
    JobExplorer jobExplorer;

    private JobParameters lastJobInstanceJobParameters;

    @Override
    public boolean evaluate(Resource object) {
        Validate.notNull(object);
        Validate.notEmpty(object.getFilename());

        return lastJobInstanceJobParameters.getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY).contains(object.getFilename());
    }

    @Override
    public boolean isPredicateActive() {
        final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(SchedulingLauncher.USER_STORE_PROVISIONING, 0, 1);
        if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
            final JobInstance lastJobInstance = userStoreJobInstances.get(0);
            List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);

            final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
            if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
                lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
                return true;
            }
        }
        return false;
    }
}

MainframeBeanWrapper

/**
 * A bean class wrapping User entity object and with additional properties read from file.
 */
public class MainframeBeanWrapper {
	private String userId;
	
    private String userType;

    private String actionType;

    /**
	 * @return the userId
	 */
	public String getUserId() {
		return userId;
	}

	/**
	 * @param userId the userId to set
	 */
	public void setUserId(String userId) {
		this.userId = userId;
	}

	/**
	 * @return
	 */
	public String getUserType() {
        return userType;
    }

    /**
     * @param userType
     */
    public void setUserType(String userType) {
        this.userType = userType;
    }

    /**
     * @return
     */
    public String getActionType() {
        return actionType;
    }

    /**
     * @param actionType
     */
    public void setActionType(String actionType) {
        this.actionType = actionType;
    }

    @Override
    public String toString() {
    	return "userId: " + userId + ", userType: " + userType + ", actionType: " + actionType;
    }

}

UserStoreBatchCleanupTasklet

/**
 * This step is in charge of deleting the processed files.
 * @author Chris Shayan
 */
public class UserStoreBatchCleanupTasklet implements Tasklet, InitializingBean {
    private Resource directoryResource;

    @Override
    public RepeatStatus execute(StepContribution contribution, final ChunkContext chunkContext) throws Exception {
        final File directory = directoryResource.getFile();

        if (directory.isDirectory()) {
            final File[] processedFiles = directory.listFiles(new FileFilter() {
                @Override
                public boolean accept(File pathName) {
                    String fileNames = chunkContext
                            .getStepContext()
                            .getStepExecution()
                            .getJobParameters()
                            .getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
                    return fileNames.contains(pathName.getName());
                }
            });

            for (File processedFile : processedFiles) {
                FileUtils.forceDelete(processedFile);
            }
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Validate.notNull(directoryResource, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
    }

    public void setDirectoryResource(Resource directory) {
        this.directoryResource = directory;
    }

}

UserStoreProvisionListener

/**
 * This is an instance of {@link JobExecutionListener} which on {@link JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)}
 * it will do a propper logging; with current format: Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]
 *
 * @author Chris Shayan
 */
public class UserStoreProvisionListener implements JobExecutionListener {
    private static final Logger LOG = Logger.getLogger(UserStoreProvisionListener.class);
    private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd-HH.mm.ss");

    @Override
    public void beforeJob(JobExecution jobExecution) {
    }

    /**
     * Do a proper logging.
     *
     * @param jobExecution See {@link JobExecution}
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        final String fileNames = jobExecution.getJobInstance().getJobParameters().getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
        if (StringUtils.isNotBlank(fileNames)) { // If there are no file which are process, then no need to log anything.
            log(jobExecution, fileNames);
        }
    }

    /**
     * It will log the proper message based on {@link BatchStatus}. If it is {@link BatchStatus#COMPLETED} it will use {@link Logger#info(Object)} otherwise it will use {@link Logger#error(Object)}
     * @param jobExecution See {@link JobExecution}
     * @param fileNames The file(s) which have been processed by this spring batch job.
     */
    public void log(JobExecution jobExecution, String fileNames) {
        final BatchStatus status = jobExecution.getStatus();
        final String startTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getStartTime().getTime()));
        final String endTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getEndTime().getTime()));
        final String jobName = jobExecution.getJobInstance().getJobName();

        final String logMessage = MessageFormat.format("Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]",
                jobName, startTime, endTime, fileNames);

        if (BatchStatus.COMPLETED.equals(status)) {
            LOG.info(logMessage);
        } else {
            LOG.error(logMessage);
        }
    }

}

SchedulingLauncher

/**
 * This class is in charge of launching the user store provision spring batch jobs.
 * @author Chris Shayan
 */
public class SchedulingLauncher implements InitializingBean {
	private static final Logger LOG = Logger.getLogger(SchedulingLauncher.class);
    public static final String USER_STORE_PROVISIONING = "userStoreProvisioning";

    @Autowired
    JobExplorer jobExplorer;

    private Resource resourcesPath;
    private Job job;
    private JobLauncher jobLauncher;

    /**
     * It is a field to be used as a key for data sharing between
     * {@link org.springframework.batch.item.file.MultiResourceItemReader} and {@link UserStoreBatchCleanupTasklet}
     */
    public static final String USERSTORE_FILE_NAMES_KEY = String.format("%s.userstore_file_names_key", SchedulingLauncher.class.getSimpleName());

    @Override
    public void afterPropertiesSet() throws Exception {
        Validate.notNull(resourcesPath, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
    }

    public void setResources(Resource resources) {
        this.resourcesPath = resources;
    }

    /**
     * This method is defined as a scheduler within batch.xml
     *
     * @throws Exception See {@link Exception}
     */
    public void launch() throws Exception {
    	// if there is no files in the folder, do not launch the job.
        if (StringUtils.isNotBlank(getFileNames())) {
            jobLauncher.run(job, getJobParameters());
        } else {
        	LOG.info("There is no files in the specific folder to be processed.");
        }
    }

    /**
     * Prepare a list of file names
     *
     * @return comma separated file names
     * @throws IOException
     */
    private String getFileNames() throws IOException {
        File[] fileList = resourcesPath.getFile().listFiles();
        String names = "";
        if (fileList == null || fileList.length == 0) {
            names = "";
        } else {
            for (File file : fileList) {
                names += String.format(",%s", file.getName());
            }
        }
        return names;
    }

    /**
     * @return an appropriate JobParameter for jobLauncher. If there is a FAILED job execution in the last job instance,
     * return its JobParameter to restart the job again.
     */
    private JobParameters getJobParameters() throws IOException {
        final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(USER_STORE_PROVISIONING, 0, 1);
        if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
            final JobInstance lastJobInstance = userStoreJobInstances.get(0);
            List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);

            final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
            if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
                final JobParameters lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
                return new JobParametersBuilder()
                        .addString(USERSTORE_FILE_NAMES_KEY, lastJobInstanceJobParameters.getString(USERSTORE_FILE_NAMES_KEY))
                        .toJobParameters();
            }
        }

        return getDefaultJobParameters();
    }

    /**
     * @return default JobParameter which consist of the files' name to be processed.
     *
     * @throws IOException
     */
    private JobParameters getDefaultJobParameters() throws IOException {
        return new JobParametersBuilder()
        		.addString(USERSTORE_FILE_NAMES_KEY, getFileNames())
        		.toJobParameters();
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }
}

I hope this code briefing was a quick guide for Spring Batch in your use cases. In the meantime, I have to appreciate from one of my mates, Andy Ng, who contributed a lot in implementation of this use case. 

Published at DZone with permission of Chris Shayan, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)